/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/blackbox/test_push.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-02-05 21:53:22 UTC
  • mfrom: (3928.4.3 bug_295826)
  • Revision ID: pqm@pqm.ubuntu.com-20090205215322-dlhyepy2fid5i7w6
(jam) Minor tweak to setup.py documentation for bug #295826

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
18
"""Black-box tests for bzr push."""
19
19
 
 
20
import os
20
21
import re
21
22
 
22
23
from bzrlib import (
23
 
    branch,
24
 
    bzrdir,
25
24
    errors,
26
 
    osutils,
27
 
    tests,
28
25
    transport,
29
 
    uncommit,
30
26
    urlutils,
31
 
    workingtree
32
 
    )
33
 
from bzrlib.repofmt import knitrepo
34
 
from bzrlib.tests import (
35
 
    blackbox,
36
 
    http_server,
37
 
    test_foreign,
38
 
    test_server,
39
 
    )
40
 
from bzrlib.transport import memory
41
 
 
42
 
 
43
 
def load_tests(standard_tests, module, loader):
44
 
    """Multiply tests for the push command."""
45
 
    result = loader.suiteClass()
46
 
 
47
 
    # one for each king of change
48
 
    changes_tests, remaining_tests = tests.split_suite_by_condition(
49
 
        standard_tests, tests.condition_isinstance((
50
 
                TestPushStrictWithChanges,
51
 
                )))
52
 
    changes_scenarios = [
53
 
        ('uncommitted',
54
 
         dict(_changes_type= '_uncommitted_changes')),
55
 
        ('pending-merges',
56
 
         dict(_changes_type= '_pending_merges')),
57
 
        ('out-of-sync-trees',
58
 
         dict(_changes_type= '_out_of_sync_trees')),
59
 
        ]
60
 
    tests.multiply_tests(changes_tests, changes_scenarios, result)
61
 
    # No parametrization for the remaining tests
62
 
    result.addTests(remaining_tests)
63
 
 
64
 
    return result
65
 
 
66
 
 
67
 
class TestPush(tests.TestCaseWithTransport):
68
 
 
69
 
    def test_push_error_on_vfs_http(self):
70
 
        """ pushing a branch to a HTTP server fails cleanly. """
71
 
        # the trunk is published on a web server
72
 
        self.transport_readonly_server = http_server.HttpServer
73
 
        self.make_branch('source')
74
 
        public_url = self.get_readonly_url('target')
75
 
        self.run_bzr_error(['http does not support mkdir'],
76
 
                           ['push', public_url],
77
 
                           working_dir='source')
 
27
    )
 
28
from bzrlib.branch import Branch
 
29
from bzrlib.bzrdir import BzrDirMetaFormat1
 
30
from bzrlib.osutils import abspath
 
31
from bzrlib.repofmt.knitrepo import RepositoryFormatKnit1
 
32
from bzrlib.tests.blackbox import ExternalBase
 
33
from bzrlib.tests.http_server import HttpServer
 
34
from bzrlib.transport.memory import MemoryServer, MemoryTransport
 
35
from bzrlib.uncommit import uncommit
 
36
from bzrlib.urlutils import local_path_from_url
 
37
from bzrlib.workingtree import WorkingTree
 
38
 
 
39
 
 
40
class TestPush(ExternalBase):
78
41
 
79
42
    def test_push_remember(self):
80
43
        """Push changes from one branch to another and test push location."""
98
61
        self.assertEqual(None, branch_b.get_push_location())
99
62
 
100
63
        # test push for failure without push location set
101
 
        out = self.run_bzr('push', working_dir='branch_a', retcode=3)
 
64
        os.chdir('branch_a')
 
65
        out = self.run_bzr('push', retcode=3)
102
66
        self.assertEquals(out,
103
67
                ('','bzr: ERROR: No push location known or specified.\n'))
104
68
 
105
69
        # test not remembered if cannot actually push
106
 
        self.run_bzr('push path/which/doesnt/exist',
107
 
                     working_dir='branch_a', retcode=3)
108
 
        out = self.run_bzr('push', working_dir='branch_a', retcode=3)
 
70
        self.run_bzr('push ../path/which/doesnt/exist', retcode=3)
 
71
        out = self.run_bzr('push', retcode=3)
109
72
        self.assertEquals(
110
73
                ('', 'bzr: ERROR: No push location known or specified.\n'),
111
74
                out)
112
75
 
113
76
        # test implicit --remember when no push location set, push fails
114
 
        out = self.run_bzr('push ../branch_b',
115
 
                           working_dir='branch_a', retcode=3)
 
77
        out = self.run_bzr('push ../branch_b', retcode=3)
116
78
        self.assertEquals(out,
117
79
                ('','bzr: ERROR: These branches have diverged.  '
118
 
                 'See "bzr help diverged-branches" for more information.\n'))
119
 
        self.assertEquals(osutils.abspath(branch_a.get_push_location()),
120
 
                          osutils.abspath(branch_b.bzrdir.root_transport.base))
 
80
                    'Try using "merge" and then "push".\n'))
 
81
        self.assertEquals(abspath(branch_a.get_push_location()),
 
82
                          abspath(branch_b.bzrdir.root_transport.base))
121
83
 
122
84
        # test implicit --remember after resolving previous failure
123
 
        uncommit.uncommit(branch=branch_b, tree=tree_b)
 
85
        uncommit(branch=branch_b, tree=tree_b)
124
86
        transport.delete('branch_b/c')
125
 
        out, err = self.run_bzr('push', working_dir='branch_a')
 
87
        out, err = self.run_bzr('push')
126
88
        path = branch_a.get_push_location()
127
89
        self.assertEquals(out,
128
 
                          'Using saved push location: %s\n'
129
 
                          % urlutils.local_path_from_url(path))
 
90
                          'Using saved push location: %s\n' 
 
91
                          'Pushed up to revision 2.\n'
 
92
                          % local_path_from_url(path))
130
93
        self.assertEqual(err,
131
 
                         'All changes applied successfully.\n'
132
 
                         'Pushed up to revision 2.\n')
 
94
                         'All changes applied successfully.\n')
133
95
        self.assertEqual(path,
134
96
                         branch_b.bzrdir.root_transport.base)
135
97
        # test explicit --remember
136
 
        self.run_bzr('push ../branch_c --remember', working_dir='branch_a')
 
98
        self.run_bzr('push ../branch_c --remember')
137
99
        self.assertEquals(branch_a.get_push_location(),
138
100
                          branch_c.bzrdir.root_transport.base)
139
 
 
 
101
    
140
102
    def test_push_without_tree(self):
141
103
        # bzr push from a branch that does not have a checkout should work.
142
104
        b = self.make_branch('.')
143
105
        out, err = self.run_bzr('push pushed-location')
144
106
        self.assertEqual('', out)
145
107
        self.assertEqual('Created new branch.\n', err)
146
 
        b2 = branch.Branch.open('pushed-location')
 
108
        b2 = Branch.open('pushed-location')
147
109
        self.assertEndsWith(b2.base, 'pushed-location/')
148
110
 
149
111
    def test_push_new_branch_revision_count(self):
150
 
        # bzr push of a branch with revisions to a new location
151
 
        # should print the number of revisions equal to the length of the
 
112
        # bzr push of a branch with revisions to a new location 
 
113
        # should print the number of revisions equal to the length of the 
152
114
        # local branch.
153
115
        t = self.make_branch_and_tree('tree')
154
116
        self.build_tree(['tree/file'])
155
117
        t.add('file')
156
118
        t.commit('commit 1')
157
 
        out, err = self.run_bzr('push -d tree pushed-to')
 
119
        os.chdir('tree')
 
120
        out, err = self.run_bzr('push pushed-to')
 
121
        os.chdir('..')
158
122
        self.assertEqual('', out)
159
123
        self.assertEqual('Created new branch.\n', err)
160
124
 
161
125
    def test_push_only_pushes_history(self):
162
126
        # Knit branches should only push the history for the current revision.
163
 
        format = bzrdir.BzrDirMetaFormat1()
164
 
        format.repository_format = knitrepo.RepositoryFormatKnit1()
 
127
        format = BzrDirMetaFormat1()
 
128
        format.repository_format = RepositoryFormatKnit1()
165
129
        shared_repo = self.make_repository('repo', format=format, shared=True)
166
130
        shared_repo.set_make_working_trees(True)
167
131
 
168
132
        def make_shared_tree(path):
169
133
            shared_repo.bzrdir.root_transport.mkdir(path)
170
134
            shared_repo.bzrdir.create_branch_convenience('repo/' + path)
171
 
            return workingtree.WorkingTree.open('repo/' + path)
 
135
            return WorkingTree.open('repo/' + path)
172
136
        tree_a = make_shared_tree('a')
173
137
        self.build_tree(['repo/a/file'])
174
138
        tree_a.add('file')
189
153
 
190
154
        # Now that we have a repository with shared files, make sure
191
155
        # that things aren't copied out by a 'push'
192
 
        self.run_bzr('push ../../push-b', working_dir='repo/b')
193
 
        pushed_tree = workingtree.WorkingTree.open('push-b')
 
156
        os.chdir('repo/b')
 
157
        self.run_bzr('push ../../push-b')
 
158
        pushed_tree = WorkingTree.open('../../push-b')
194
159
        pushed_repo = pushed_tree.branch.repository
195
160
        self.assertFalse(pushed_repo.has_revision('a-1'))
196
161
        self.assertFalse(pushed_repo.has_revision('a-2'))
198
163
 
199
164
    def test_push_funky_id(self):
200
165
        t = self.make_branch_and_tree('tree')
201
 
        self.build_tree(['tree/filename'])
 
166
        os.chdir('tree')
 
167
        self.build_tree(['filename'])
202
168
        t.add('filename', 'funky-chars<>%&;"\'')
203
169
        t.commit('commit filename')
204
 
        self.run_bzr('push -d tree new-tree')
 
170
        self.run_bzr('push ../new-tree')
205
171
 
206
172
    def test_push_dash_d(self):
207
173
        t = self.make_branch_and_tree('from')
209
175
                message='first commit')
210
176
        self.run_bzr('push -d from to-one')
211
177
        self.failUnlessExists('to-one')
212
 
        self.run_bzr('push -d %s %s'
 
178
        self.run_bzr('push -d %s %s' 
213
179
            % tuple(map(urlutils.local_path_to_url, ['from', 'to-two'])))
214
180
        self.failUnlessExists('to-two')
215
181
 
216
 
    def test_push_smart_non_stacked_streaming_acceptance(self):
217
 
        self.setup_smart_server_with_call_log()
218
 
        t = self.make_branch_and_tree('from')
219
 
        t.commit(allow_pointless=True, message='first commit')
220
 
        self.reset_smart_call_log()
221
 
        self.run_bzr(['push', self.get_url('to-one')], working_dir='from')
222
 
        # This figure represent the amount of work to perform this use case. It
223
 
        # is entirely ok to reduce this number if a test fails due to rpc_count
224
 
        # being too low. If rpc_count increases, more network roundtrips have
225
 
        # become necessary for this use case. Please do not adjust this number
226
 
        # upwards without agreement from bzr's network support maintainers.
227
 
        self.assertLength(9, self.hpss_calls)
228
 
 
229
 
    def test_push_smart_stacked_streaming_acceptance(self):
230
 
        self.setup_smart_server_with_call_log()
231
 
        parent = self.make_branch_and_tree('parent', format='1.9')
232
 
        parent.commit(message='first commit')
233
 
        local = parent.bzrdir.sprout('local').open_workingtree()
234
 
        local.commit(message='local commit')
235
 
        self.reset_smart_call_log()
236
 
        self.run_bzr(['push', '--stacked', '--stacked-on', '../parent',
237
 
            self.get_url('public')], working_dir='local')
238
 
        # This figure represent the amount of work to perform this use case. It
239
 
        # is entirely ok to reduce this number if a test fails due to rpc_count
240
 
        # being too low. If rpc_count increases, more network roundtrips have
241
 
        # become necessary for this use case. Please do not adjust this number
242
 
        # upwards without agreement from bzr's network support maintainers.
243
 
        self.assertLength(14, self.hpss_calls)
244
 
        remote = branch.Branch.open('public')
245
 
        self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
246
 
 
247
 
    def test_push_smart_tags_streaming_acceptance(self):
248
 
        self.setup_smart_server_with_call_log()
249
 
        t = self.make_branch_and_tree('from')
250
 
        rev_id = t.commit(allow_pointless=True, message='first commit')
251
 
        t.branch.tags.set_tag('new-tag', rev_id)
252
 
        self.reset_smart_call_log()
253
 
        self.run_bzr(['push', self.get_url('to-one')], working_dir='from')
254
 
        # This figure represent the amount of work to perform this use case. It
255
 
        # is entirely ok to reduce this number if a test fails due to rpc_count
256
 
        # being too low. If rpc_count increases, more network roundtrips have
257
 
        # become necessary for this use case. Please do not adjust this number
258
 
        # upwards without agreement from bzr's network support maintainers.
259
 
        self.assertLength(11, self.hpss_calls)
260
 
 
261
 
    def test_push_smart_incremental_acceptance(self):
262
 
        self.setup_smart_server_with_call_log()
263
 
        t = self.make_branch_and_tree('from')
264
 
        rev_id1 = t.commit(allow_pointless=True, message='first commit')
265
 
        rev_id2 = t.commit(allow_pointless=True, message='second commit')
266
 
        self.run_bzr(
267
 
            ['push', self.get_url('to-one'), '-r1'], working_dir='from')
268
 
        self.reset_smart_call_log()
269
 
        self.run_bzr(['push', self.get_url('to-one')], working_dir='from')
270
 
        # This figure represent the amount of work to perform this use case. It
271
 
        # is entirely ok to reduce this number if a test fails due to rpc_count
272
 
        # being too low. If rpc_count increases, more network roundtrips have
273
 
        # become necessary for this use case. Please do not adjust this number
274
 
        # upwards without agreement from bzr's network support maintainers.
275
 
        self.assertLength(11, self.hpss_calls)
276
 
 
277
 
    def test_push_smart_with_default_stacking_url_path_segment(self):
278
 
        # If the default stacked-on location is a path element then branches
279
 
        # we push there over the smart server are stacked and their
280
 
        # stacked_on_url is that exact path segment. Added to nail bug 385132.
281
 
        self.setup_smart_server_with_call_log()
282
 
        self.make_branch('stack-on', format='1.9')
283
 
        self.make_bzrdir('.').get_config().set_default_stack_on(
284
 
            '/stack-on')
285
 
        self.make_branch('from', format='1.9')
286
 
        out, err = self.run_bzr(['push', '-d', 'from', self.get_url('to')])
287
 
        b = branch.Branch.open(self.get_url('to'))
288
 
        self.assertEqual('/extra/stack-on', b.get_stacked_on_url())
289
 
 
290
 
    def test_push_smart_with_default_stacking_relative_path(self):
291
 
        # If the default stacked-on location is a relative path then branches
292
 
        # we push there over the smart server are stacked and their
293
 
        # stacked_on_url is a relative path. Added to nail bug 385132.
294
 
        self.setup_smart_server_with_call_log()
295
 
        self.make_branch('stack-on', format='1.9')
296
 
        self.make_bzrdir('.').get_config().set_default_stack_on('stack-on')
297
 
        self.make_branch('from', format='1.9')
298
 
        out, err = self.run_bzr(['push', '-d', 'from', self.get_url('to')])
299
 
        b = branch.Branch.open(self.get_url('to'))
300
 
        self.assertEqual('../stack-on', b.get_stacked_on_url())
301
 
 
302
182
    def create_simple_tree(self):
303
183
        tree = self.make_branch_and_tree('tree')
304
184
        self.build_tree(['tree/a'])
315
195
                           working_dir='tree')
316
196
        self.run_bzr('push ../new/tree --create-prefix',
317
197
                     working_dir='tree')
318
 
        new_tree = workingtree.WorkingTree.open('new/tree')
 
198
        new_tree = WorkingTree.open('new/tree')
319
199
        self.assertEqual(tree.last_revision(), new_tree.last_revision())
320
200
        self.failUnlessExists('new/tree/a')
321
201
 
335
215
        self.run_bzr('push --use-existing-dir ../target',
336
216
                     working_dir='tree')
337
217
 
338
 
        new_tree = workingtree.WorkingTree.open('target')
 
218
        new_tree = WorkingTree.open('target')
339
219
        self.assertEqual(tree.last_revision(), new_tree.last_revision())
340
220
        # The push should have created target/a
341
221
        self.failUnlessExists('target/a')
342
222
 
343
 
    def test_push_use_existing_into_empty_bzrdir(self):
344
 
        """'bzr push --use-existing-dir' into a dir with an empty .bzr dir
345
 
        fails.
346
 
        """
347
 
        tree = self.create_simple_tree()
348
 
        self.build_tree(['target/', 'target/.bzr/'])
349
 
        self.run_bzr_error(
350
 
            ['Target directory ../target already contains a .bzr directory, '
351
 
             'but it is not valid.'],
352
 
            'push ../target --use-existing-dir', working_dir='tree')
353
 
 
354
223
    def test_push_onto_repo(self):
355
224
        """We should be able to 'bzr push' into an existing bzrdir."""
356
225
        tree = self.create_simple_tree()
362
231
        # Pushing onto an existing bzrdir will create a repository and
363
232
        # branch as needed, but will only create a working tree if there was
364
233
        # no BzrDir before.
365
 
        self.assertRaises(errors.NoWorkingTree,
366
 
                          workingtree.WorkingTree.open, 'repo')
367
 
        new_branch = branch.Branch.open('repo')
 
234
        self.assertRaises(errors.NoWorkingTree, WorkingTree.open, 'repo')
 
235
        new_branch = Branch.open('repo')
368
236
        self.assertEqual(tree.last_revision(), new_branch.last_revision())
369
237
 
370
238
    def test_push_onto_just_bzrdir(self):
389
257
 
390
258
        self.run_bzr('push -r1 ../to', working_dir='from')
391
259
 
392
 
        tree_to = workingtree.WorkingTree.open('to')
 
260
        tree_to = WorkingTree.open('to')
393
261
        repo_to = tree_to.branch.repository
394
262
        self.assertTrue(repo_to.has_revision('from-1'))
395
263
        self.assertFalse(repo_to.has_revision('from-2'))
396
264
        self.assertEqual(tree_to.branch.last_revision_info()[1], 'from-1')
397
265
 
398
266
        self.run_bzr_error(
399
 
            ['bzr: ERROR: bzr push --revision '
400
 
             'takes exactly one revision identifier\n'],
 
267
            "bzr: ERROR: bzr push --revision takes one value.\n",
401
268
            'push -r0..2 ../to', working_dir='from')
402
269
 
403
270
    def create_trunk_and_feature_branch(self):
404
271
        # We have a mainline
405
272
        trunk_tree = self.make_branch_and_tree('target',
406
 
            format='1.9')
 
273
            format='development')
407
274
        trunk_tree.commit('mainline')
408
275
        # and a branch from it
409
276
        branch_tree = self.make_branch_and_tree('branch',
410
 
            format='1.9')
 
277
            format='development')
411
278
        branch_tree.pull(trunk_tree.branch)
412
279
        branch_tree.branch.set_parent(trunk_tree.branch.base)
413
280
        # with some work on it
416
283
 
417
284
    def assertPublished(self, branch_revid, stacked_on):
418
285
        """Assert that the branch 'published' has been published correctly."""
419
 
        published_branch = branch.Branch.open('published')
 
286
        published_branch = Branch.open('published')
420
287
        # The published branch refers to the mainline
421
288
        self.assertEqual(stacked_on, published_branch.get_stacked_on_url())
422
289
        # and the branch's work was pushed
444
311
        self.assertEqual('', out)
445
312
        self.assertEqual('Created new stacked branch referring to %s.\n' %
446
313
            trunk_tree.branch.base, err)
447
 
        self.assertPublished(branch_tree.last_revision(),
448
 
                             trunk_tree.branch.base)
 
314
        self.assertPublished(branch_tree.last_revision(), trunk_tree.branch.base)
449
315
 
450
316
    def test_push_new_branch_stacked_uses_parent_public(self):
451
317
        """Pushing a new branch with --stacked creates a stacked branch."""
452
318
        trunk_tree, branch_tree = self.create_trunk_and_feature_branch()
453
319
        # the trunk is published on a web server
454
 
        self.transport_readonly_server = http_server.HttpServer
455
 
        trunk_public = self.make_branch('public_trunk', format='1.9')
 
320
        self.transport_readonly_server = HttpServer
 
321
        trunk_public = self.make_branch('public_trunk', format='development')
456
322
        trunk_public.pull(trunk_tree.branch)
457
323
        trunk_public_url = self.get_readonly_url('public_trunk')
458
324
        trunk_tree.branch.set_public_branch(trunk_public_url)
467
333
 
468
334
    def test_push_new_branch_stacked_no_parent(self):
469
335
        """Pushing with --stacked and no parent branch errors."""
470
 
        branch = self.make_branch_and_tree('branch', format='1.9')
 
336
        branch = self.make_branch_and_tree('branch', format='development')
471
337
        # now we do a stacked push, which should fail as the place to refer too
472
338
        # cannot be determined.
473
339
        out, err = self.run_bzr_error(
484
350
        self.assertContainsRe(err,
485
351
                              'Using default stacking branch stack_on at .*')
486
352
 
487
 
    def test_push_stacks_with_default_stacking_if_target_is_stackable(self):
488
 
        self.make_branch('stack_on', format='1.6')
489
 
        self.make_bzrdir('.').get_config().set_default_stack_on('stack_on')
490
 
        self.make_branch('from', format='pack-0.92')
491
 
        out, err = self.run_bzr('push -d from to')
492
 
        b = branch.Branch.open('to')
493
 
        self.assertEqual('../stack_on', b.get_stacked_on_url())
494
 
 
495
 
    def test_push_does_not_change_format_with_default_if_target_cannot(self):
496
 
        self.make_branch('stack_on', format='pack-0.92')
497
 
        self.make_bzrdir('.').get_config().set_default_stack_on('stack_on')
498
 
        self.make_branch('from', format='pack-0.92')
499
 
        out, err = self.run_bzr('push -d from to')
500
 
        b = branch.Branch.open('to')
501
 
        self.assertRaises(errors.UnstackableBranchFormat, b.get_stacked_on_url)
502
 
 
503
353
    def test_push_doesnt_create_broken_branch(self):
504
354
        """Pushing a new standalone branch works even when there's a default
505
355
        stacking policy at the destination.
545
395
        # subsequent log is accurate
546
396
        self.assertNotContainsRe(out, 'rev1')
547
397
 
548
 
    def test_push_from_subdir(self):
549
 
        t = self.make_branch_and_tree('tree')
550
 
        self.build_tree(['tree/dir/', 'tree/dir/file'])
551
 
        t.add('dir', 'dir/file')
552
 
        t.commit('r1')
553
 
        out, err = self.run_bzr('push ../../pushloc', working_dir='tree/dir')
554
 
        self.assertEqual('', out)
555
 
        self.assertEqual('Created new branch.\n', err)
556
 
 
557
 
 
558
 
class RedirectingMemoryTransport(memory.MemoryTransport):
 
398
 
 
399
class RedirectingMemoryTransport(MemoryTransport):
559
400
 
560
401
    def mkdir(self, relpath, mode=None):
 
402
        from bzrlib.trace import mutter
 
403
        mutter('cwd: %r, rel: %r, abs: %r' % (self._cwd, relpath, abspath))
561
404
        if self._cwd == '/source/':
562
405
            raise errors.RedirectRequested(self.abspath(relpath),
563
406
                                           self.abspath('../target'),
570
413
            return super(RedirectingMemoryTransport, self).mkdir(
571
414
                relpath, mode)
572
415
 
573
 
    def get(self, relpath):
574
 
        if self.clone(relpath)._cwd == '/infinite-loop/':
575
 
            raise errors.RedirectRequested(self.abspath(relpath),
576
 
                                           self.abspath('../infinite-loop'),
577
 
                                           is_permanent=True)
578
 
        else:
579
 
            return super(RedirectingMemoryTransport, self).get(relpath)
580
 
 
581
416
    def _redirected_to(self, source, target):
582
417
        # We do accept redirections
583
418
        return transport.get_transport(target)
584
419
 
585
420
 
586
 
class RedirectingMemoryServer(memory.MemoryServer):
 
421
class RedirectingMemoryServer(MemoryServer):
587
422
 
588
 
    def start_server(self):
 
423
    def setUp(self):
589
424
        self._dirs = {'/': None}
590
425
        self._files = {}
591
426
        self._locks = {}
599
434
        result._locks = self._locks
600
435
        return result
601
436
 
602
 
    def stop_server(self):
 
437
    def tearDown(self):
603
438
        transport.unregister_transport(self._scheme, self._memory_factory)
604
439
 
605
440
 
606
 
class TestPushRedirect(tests.TestCaseWithTransport):
 
441
class TestPushRedirect(ExternalBase):
607
442
 
608
443
    def setUp(self):
609
 
        tests.TestCaseWithTransport.setUp(self)
 
444
        ExternalBase.setUp(self)
610
445
        self.memory_server = RedirectingMemoryServer()
611
 
        self.start_server(self.memory_server)
 
446
        self.memory_server.setUp()
 
447
        self.addCleanup(self.memory_server.tearDown)
 
448
 
612
449
        # Make the branch and tree that we'll be pushing.
613
450
        t = self.make_branch_and_tree('tree')
614
451
        self.build_tree(['tree/file'])
624
461
        destination_url = self.memory_server.get_url() + 'source'
625
462
        self.run_bzr(['push', '-d', 'tree', destination_url])
626
463
 
627
 
        local_revision = branch.Branch.open('tree').last_revision()
628
 
        remote_revision = branch.Branch.open(
 
464
        local_revision = Branch.open('tree').last_revision()
 
465
        remote_revision = Branch.open(
629
466
            self.memory_server.get_url() + 'target').last_revision()
630
467
        self.assertEqual(remote_revision, local_revision)
631
468
 
639
476
             % re.escape(destination_url)],
640
477
            ['push', '-d', 'tree', destination_url], retcode=3)
641
478
        self.assertEqual('', out)
642
 
 
643
 
 
644
 
class TestPushStrictMixin(object):
645
 
 
646
 
    def make_local_branch_and_tree(self):
647
 
        self.tree = self.make_branch_and_tree('local')
648
 
        self.build_tree_contents([('local/file', 'initial')])
649
 
        self.tree.add('file')
650
 
        self.tree.commit('adding file', rev_id='added')
651
 
        self.build_tree_contents([('local/file', 'modified')])
652
 
        self.tree.commit('modify file', rev_id='modified')
653
 
 
654
 
    def set_config_push_strict(self, value):
655
 
        # set config var (any of bazaar.conf, locations.conf, branch.conf
656
 
        # should do)
657
 
        conf = self.tree.branch.get_config()
658
 
        conf.set_user_option('push_strict', value)
659
 
 
660
 
    _default_command = ['push', '../to']
661
 
    _default_wd = 'local'
662
 
    _default_errors = ['Working tree ".*/local/" has uncommitted '
663
 
                       'changes \(See bzr status\)\.',]
664
 
    _default_additional_error = 'Use --no-strict to force the push.\n'
665
 
    _default_additional_warning = 'Uncommitted changes will not be pushed.'
666
 
 
667
 
 
668
 
    def assertPushFails(self, args):
669
 
        out, err = self.run_bzr_error(self._default_errors,
670
 
                                      self._default_command + args,
671
 
                                      working_dir=self._default_wd, retcode=3)
672
 
        self.assertContainsRe(err, self._default_additional_error)
673
 
 
674
 
    def assertPushSucceeds(self, args, with_warning=False, revid_to_push=None):
675
 
        if with_warning:
676
 
            error_regexes = self._default_errors
677
 
        else:
678
 
            error_regexes = []
679
 
        out, err = self.run_bzr(self._default_command + args,
680
 
                                working_dir=self._default_wd,
681
 
                                error_regexes=error_regexes)
682
 
        if with_warning:
683
 
            self.assertContainsRe(err, self._default_additional_warning)
684
 
        else:
685
 
            self.assertNotContainsRe(err, self._default_additional_warning)
686
 
        branch_from = branch.Branch.open(self._default_wd)
687
 
        if revid_to_push is None:
688
 
            revid_to_push = branch_from.last_revision()
689
 
        branch_to = branch.Branch.open('to')
690
 
        repo_to = branch_to.repository
691
 
        self.assertTrue(repo_to.has_revision(revid_to_push))
692
 
        self.assertEqual(revid_to_push, branch_to.last_revision())
693
 
 
694
 
 
695
 
 
696
 
class TestPushStrictWithoutChanges(tests.TestCaseWithTransport,
697
 
                                   TestPushStrictMixin):
698
 
 
699
 
    def setUp(self):
700
 
        super(TestPushStrictWithoutChanges, self).setUp()
701
 
        self.make_local_branch_and_tree()
702
 
 
703
 
    def test_push_default(self):
704
 
        self.assertPushSucceeds([])
705
 
 
706
 
    def test_push_strict(self):
707
 
        self.assertPushSucceeds(['--strict'])
708
 
 
709
 
    def test_push_no_strict(self):
710
 
        self.assertPushSucceeds(['--no-strict'])
711
 
 
712
 
    def test_push_config_var_strict(self):
713
 
        self.set_config_push_strict('true')
714
 
        self.assertPushSucceeds([])
715
 
 
716
 
    def test_push_config_var_no_strict(self):
717
 
        self.set_config_push_strict('false')
718
 
        self.assertPushSucceeds([])
719
 
 
720
 
 
721
 
class TestPushStrictWithChanges(tests.TestCaseWithTransport,
722
 
                                TestPushStrictMixin):
723
 
 
724
 
    _changes_type = None # Set by load_tests
725
 
 
726
 
    def setUp(self):
727
 
        super(TestPushStrictWithChanges, self).setUp()
728
 
        # Apply the changes defined in load_tests: one of _uncommitted_changes,
729
 
        # _pending_merges or _out_of_sync_trees
730
 
        getattr(self, self._changes_type)()
731
 
 
732
 
    def _uncommitted_changes(self):
733
 
        self.make_local_branch_and_tree()
734
 
        # Make a change without committing it
735
 
        self.build_tree_contents([('local/file', 'in progress')])
736
 
 
737
 
    def _pending_merges(self):
738
 
        self.make_local_branch_and_tree()
739
 
        # Create 'other' branch containing a new file
740
 
        other_bzrdir = self.tree.bzrdir.sprout('other')
741
 
        other_tree = other_bzrdir.open_workingtree()
742
 
        self.build_tree_contents([('other/other-file', 'other')])
743
 
        other_tree.add('other-file')
744
 
        other_tree.commit('other commit', rev_id='other')
745
 
        # Merge and revert, leaving a pending merge
746
 
        self.tree.merge_from_branch(other_tree.branch)
747
 
        self.tree.revert(filenames=['other-file'], backups=False)
748
 
 
749
 
    def _out_of_sync_trees(self):
750
 
        self.make_local_branch_and_tree()
751
 
        self.run_bzr(['checkout', '--lightweight', 'local', 'checkout'])
752
 
        # Make a change and commit it
753
 
        self.build_tree_contents([('local/file', 'modified in local')])
754
 
        self.tree.commit('modify file', rev_id='modified-in-local')
755
 
        # Exercise commands from the checkout directory
756
 
        self._default_wd = 'checkout'
757
 
        self._default_errors = ["Working tree is out of date, please run"
758
 
                                " 'bzr update'\.",]
759
 
 
760
 
    def test_push_default(self):
761
 
        self.assertPushSucceeds([], with_warning=True)
762
 
 
763
 
    def test_push_with_revision(self):
764
 
        self.assertPushSucceeds(['-r', 'revid:added'], revid_to_push='added')
765
 
 
766
 
    def test_push_no_strict(self):
767
 
        self.assertPushSucceeds(['--no-strict'])
768
 
 
769
 
    def test_push_strict_with_changes(self):
770
 
        self.assertPushFails(['--strict'])
771
 
 
772
 
    def test_push_respect_config_var_strict(self):
773
 
        self.set_config_push_strict('true')
774
 
        self.assertPushFails([])
775
 
 
776
 
    def test_push_bogus_config_var_ignored(self):
777
 
        self.set_config_push_strict("I don't want you to be strict")
778
 
        self.assertPushSucceeds([], with_warning=True)
779
 
 
780
 
    def test_push_no_strict_command_line_override_config(self):
781
 
        self.set_config_push_strict('yES')
782
 
        self.assertPushFails([])
783
 
        self.assertPushSucceeds(['--no-strict'])
784
 
 
785
 
    def test_push_strict_command_line_override_config(self):
786
 
        self.set_config_push_strict('oFF')
787
 
        self.assertPushFails(['--strict'])
788
 
        self.assertPushSucceeds([])
789
 
 
790
 
 
791
 
class TestPushForeign(blackbox.ExternalBase):
792
 
 
793
 
    def setUp(self):
794
 
        super(TestPushForeign, self).setUp()
795
 
        test_foreign.register_dummy_foreign_for_test(self)
796
 
 
797
 
    def make_dummy_builder(self, relpath):
798
 
        builder = self.make_branch_builder(
799
 
            relpath, format=test_foreign.DummyForeignVcsDirFormat())
800
 
        builder.build_snapshot('revid', None,
801
 
            [('add', ('', 'TREE_ROOT', 'directory', None)),
802
 
             ('add', ('foo', 'fooid', 'file', 'bar'))])
803
 
        return builder
804
 
 
805
 
    def test_no_roundtripping(self):
806
 
        target_branch = self.make_dummy_builder('dp').get_branch()
807
 
        source_tree = self.make_branch_and_tree("dc")
808
 
        output, error = self.run_bzr("push -d dc dp", retcode=3)
809
 
        self.assertEquals("", output)
810
 
        self.assertEquals(error, "bzr: ERROR: It is not possible to losslessly"
811
 
            " push to dummy. You may want to use dpush instead.\n")