1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
1
# (C) 2005 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for branch implementations - tests a branch format."""
22
branch as _mod_branch,
39
from breezy.bzr import (
40
branch as _mod_bzrbranch,
43
from breezy.sixish import (
46
from breezy.tests import (
49
from breezy.tests.http_server import HttpServer
50
from breezy.transport import memory
53
class TestTestCaseWithBranch(per_branch.TestCaseWithBranch):
55
def test_branch_format_matches_bzrdir_branch_format(self):
56
bzrdir_branch_format = self.bzrdir_format.get_branch_format()
58
self.branch_format.__class__,
59
bzrdir_branch_format.__class__)
61
def test_make_branch_gets_expected_format(self):
62
branch = self.make_branch('.')
64
self.branch_format.__class__,
65
branch._format.__class__)
68
class TestBranch(per_branch.TestCaseWithBranch):
70
def test_create_tree_with_merge(self):
71
tree, revmap = self.create_tree_with_merge()
73
self.addCleanup(tree.unlock)
74
graph = tree.branch.repository.get_graph()
75
ancestry_graph = graph.get_parent_map(
76
tree.branch.repository.all_revision_ids())
77
self.assertEqual({revmap['1']: (b'null:',),
78
revmap['2']: (revmap['1'], ),
79
revmap['1.1.1']: (revmap['1'], ),
80
revmap['3']: (revmap['2'], revmap['1.1.1'], ),
83
def test_revision_ids_are_utf8(self):
84
wt = self.make_branch_and_tree('tree')
89
br = self.get_branch()
91
br.generate_revision_history(rev3)
92
for revision_id in [rev3, rev2, rev1]:
93
self.assertIsInstance(revision_id, bytes)
94
last = br.last_revision()
95
self.assertEqual(rev3, last)
96
self.assertIsInstance(last, bytes)
97
revno, last = br.last_revision_info()
98
self.assertEqual(3, revno)
99
self.assertEqual(rev3, last)
100
self.assertIsInstance(last, bytes)
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
from bzrlib.clone import copy_branch
21
from bzrlib.commit import commit
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
27
from bzrlib.trace import mutter
28
import bzrlib.transactions as transactions
29
from bzrlib.revision import NULL_REVISION
31
# TODO: Make a branch using basis branch, and check that it
32
# doesn't request any files that could have been avoided, by
33
# hooking into the Transport.
35
class TestBranch(TestCaseInTempDir):
37
def test_append_revisions(self):
38
"""Test appending more than one revision"""
39
br = Branch.initialize(".")
40
br.append_revision("rev1")
41
self.assertEquals(br.revision_history(), ["rev1",])
42
br.append_revision("rev2", "rev3")
43
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
102
45
def test_fetch_revisions(self):
103
46
"""Test fetch-revision operation."""
104
wt = self.make_branch_and_tree('b1')
106
self.build_tree_contents([('b1/foo', b'hello')])
108
rev1 = wt.commit('lala!', allow_pointless=False)
110
b2 = self.make_branch('b2')
111
result = b2.fetch(b1)
112
self.assertIsInstance(result, repository.FetchResult)
114
rev = b2.repository.get_revision(rev1)
115
tree = b2.repository.revision_tree(rev1)
117
self.addCleanup(tree.unlock)
118
self.assertEqual(tree.get_file_text('foo'), b'hello')
120
def get_unbalanced_tree_pair(self):
47
from bzrlib.fetch import Fetcher
50
b1 = Branch.initialize('b1')
51
b2 = Branch.initialize('b2')
52
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
53
b1.add(['foo'], ['foo-id'])
54
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
57
f = Fetcher(from_branch=b1, to_branch=b2)
58
eq = self.assertEquals
60
eq(f.last_revision, 'revision-1')
62
rev = b2.get_revision('revision-1')
63
tree = b2.revision_tree('revision-1')
64
eq(tree.get_file_text('foo-id'), 'hello')
66
def test_revision_tree(self):
67
b1 = Branch.initialize('.')
68
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
69
tree = b1.revision_tree('revision-1')
70
tree = b1.revision_tree(None)
71
self.assertEqual(len(tree.list_files()), 0)
72
tree = b1.revision_tree(NULL_REVISION)
73
self.assertEqual(len(tree.list_files()), 0)
75
def get_unbalanced_branch_pair(self):
121
76
"""Return two branches, a and b, with one file in a."""
122
tree_a = self.make_branch_and_tree('a')
123
self.build_tree_contents([('a/b', b'b')])
125
tree_a.commit("silly commit")
127
tree_b = self.make_branch_and_tree('b')
128
return tree_a, tree_b
78
br_a = Branch.initialize("a")
79
file('a/b', 'wb').write('b')
81
commit(br_a, "silly commit", rev_id='A')
83
br_b = Branch.initialize("b")
130
86
def get_balanced_branch_pair(self):
131
87
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
132
tree_a, tree_b = self.get_unbalanced_tree_pair()
133
tree_b.branch.repository.fetch(tree_a.branch.repository)
134
return tree_a, tree_b
136
def test_clone_partial(self):
88
br_a, br_b = self.get_unbalanced_branch_pair()
89
br_a.push_stores(br_b)
92
def test_push_stores(self):
93
"""Copy the stores from one branch to another"""
94
br_a, br_b = self.get_unbalanced_branch_pair()
95
# ensure the revision is missing.
96
self.assertRaises(NoSuchRevision, br_b.get_revision,
97
br_a.revision_history()[0])
98
br_a.push_stores(br_b)
99
# check that b now has all the data from a's first commit.
100
rev = br_b.get_revision(br_a.revision_history()[0])
101
tree = br_b.revision_tree(br_a.revision_history()[0])
103
if tree.inventory[file_id].kind == "file":
104
tree.get_file(file_id).read()
107
def test_copy_branch(self):
108
"""Copy the stores from one branch to another"""
109
br_a, br_b = self.get_balanced_branch_pair()
110
commit(br_b, "silly commit")
112
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
113
self.assertEqual(br_a.revision_history(), br_c.revision_history())
115
def test_copy_partial(self):
137
116
"""Copy only part of the history of a branch."""
138
# TODO: RBC 20060208 test with a revision not on revision-history.
139
# what should that behaviour be ? Emailed the list.
140
# First, make a branch with two commits.
141
wt_a = self.make_branch_and_tree('a')
142
self.build_tree(['a/one'])
144
rev1 = wt_a.commit('commit one')
117
self.build_tree(['a/', 'a/one'])
118
br_a = Branch.initialize('a')
120
br_a.working_tree().commit('commit one', rev_id='u@d-1')
145
121
self.build_tree(['a/two'])
147
wt_a.commit('commit two')
148
# Now make a copy of the repository.
149
repo_b = self.make_repository('b')
150
wt_a.branch.repository.copy_content_into(repo_b)
151
# wt_a might be a lightweight checkout, so get a hold of the actual
152
# branch (because you can't do a partial clone of a lightweight
154
branch = wt_a.branch.controldir.open_branch()
155
# Then make a branch where the new repository is, but specify a revision
156
# ID. The new branch's history will stop at the specified revision.
157
br_b = branch.clone(repo_b.controldir, revision_id=rev1)
158
self.assertEqual(rev1, br_b.last_revision())
160
def get_parented_branch(self):
161
wt_a = self.make_branch_and_tree('a')
162
self.build_tree(['a/one'])
164
rev1 = wt_a.commit('commit one')
166
branch_b = wt_a.branch.controldir.sprout(
167
'b', revision_id=rev1).open_branch()
169
urlutils.strip_segment_parameters(wt_a.branch.user_url),
170
urlutils.strip_segment_parameters(branch_b.get_parent()))
173
def test_clone_branch_nickname(self):
174
# test the nick name is preserved always
175
raise tests.TestSkipped('XXX branch cloning is not yet tested.')
177
def test_clone_branch_parent(self):
178
# test the parent is preserved always
179
branch_b = self.get_parented_branch()
180
repo_c = self.make_repository('c')
181
branch_b.repository.copy_content_into(repo_c)
182
branch_c = branch_b.clone(repo_c.controldir)
183
self.assertNotEqual(None, branch_c.get_parent())
184
self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
186
# We can also set a specific parent, and it should be honored
187
random_parent = 'http://example.com/path/to/branch'
188
branch_b.set_parent(random_parent)
189
repo_d = self.make_repository('d')
190
branch_b.repository.copy_content_into(repo_d)
191
branch_d = branch_b.clone(repo_d.controldir)
192
self.assertEqual(random_parent, branch_d.get_parent())
194
def test_submit_branch(self):
195
"""Submit location can be queried and set"""
196
branch = self.make_branch('branch')
197
self.assertEqual(branch.get_submit_branch(), None)
198
branch.set_submit_branch('sftp://example.com')
199
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
200
branch.set_submit_branch('sftp://example.net')
201
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
203
def test_public_branch(self):
204
"""public location can be queried and set"""
205
branch = self.make_branch('branch')
206
self.assertEqual(branch.get_public_branch(), None)
207
branch.set_public_branch('sftp://example.com')
208
self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
209
branch.set_public_branch('sftp://example.net')
210
self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
211
branch.set_public_branch(None)
212
self.assertEqual(branch.get_public_branch(), None)
214
def test_record_initial_ghost(self):
215
"""Branches should support having ghosts."""
216
wt = self.make_branch_and_tree('.')
217
if not wt.branch.repository._format.supports_ghosts:
218
raise tests.TestNotApplicable("repository format does not "
220
wt.set_parent_ids([b'non:existent@rev--ision--0--2'],
221
allow_leftmost_as_ghost=True)
222
self.assertEqual([b'non:existent@rev--ision--0--2'],
224
rev_id = wt.commit('commit against a ghost first parent.')
225
rev = wt.branch.repository.get_revision(rev_id)
226
self.assertEqual(rev.parent_ids, [b'non:existent@rev--ision--0--2'])
123
br_a.working_tree().commit('commit two', rev_id='u@d-2')
124
br_b = copy_branch(br_a, 'b', revision='u@d-1')
125
self.assertEqual(br_b.last_revision(), 'u@d-1')
126
self.assertTrue(os.path.exists('b/one'))
127
self.assertFalse(os.path.exists('b/two'))
129
def test_record_initial_ghost_merge(self):
130
"""A pending merge with no revision present is still a merge."""
131
branch = Branch.initialize('.')
132
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
133
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
134
rev = branch.get_revision(branch.last_revision())
135
self.assertEqual(len(rev.parent_ids), 1)
227
136
# parent_sha1s is not populated now, WTF. rbc 20051003
228
137
self.assertEqual(len(rev.parent_sha1s), 0)
230
def test_record_two_ghosts(self):
231
"""Recording with all ghosts works."""
232
wt = self.make_branch_and_tree('.')
233
if not wt.branch.repository._format.supports_ghosts:
234
raise tests.TestNotApplicable("repository format does not "
237
b'foo@azkhazan-123123-abcabc',
238
b'wibble@fofof--20050401--1928390812',
240
allow_leftmost_as_ghost=True)
241
rev_id = wt.commit("commit from ghost base with one merge")
242
# the revision should have been committed with two parents
243
rev = wt.branch.repository.get_revision(rev_id)
244
self.assertEqual([b'foo@azkhazan-123123-abcabc',
245
b'wibble@fofof--20050401--1928390812'],
138
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
248
140
def test_bad_revision(self):
249
self.assertRaises(errors.InvalidRevisionId,
250
self.get_branch().repository.get_revision,
253
def test_nicks_bzr(self):
254
"""Test the behaviour of branch nicks specific to bzr branches.
256
Nicknames are implicitly the name of the branch's directory, unless an
257
explicit nickname is set. That is, an explicit nickname always
258
overrides the implicit one.
261
t = self.get_transport()
262
branch = self.make_branch('bzr.dev')
263
if not isinstance(branch, _mod_bzrbranch.BzrBranch):
264
raise tests.TestNotApplicable("not a bzr branch format")
265
# The nick will be 'bzr.dev', because there is no explicit nick set.
141
branch = Branch.initialize('.')
142
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
145
# compare the gpg-to-sign info for a commit with a ghost and
146
# an identical tree without a ghost
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
149
def test_pending_merges(self):
150
"""Tracking pending-merged revisions."""
151
b = Branch.initialize('.')
152
wt = b.working_tree()
153
self.assertEquals(wt.pending_merges(), [])
154
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
155
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
156
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
157
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
158
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
159
self.assertEquals(wt.pending_merges(),
160
['foo@azkhazan-123123-abcabc',
161
'wibble@fofof--20050401--1928390812'])
162
b.working_tree().commit("commit from base with two merges")
163
rev = b.get_revision(b.revision_history()[0])
164
self.assertEquals(len(rev.parent_ids), 2)
165
self.assertEquals(rev.parent_ids[0],
166
'foo@azkhazan-123123-abcabc')
167
self.assertEquals(rev.parent_ids[1],
168
'wibble@fofof--20050401--1928390812')
169
# list should be cleared when we do a commit
170
self.assertEquals(wt.pending_merges(), [])
172
def test_sign_existing_revision(self):
173
branch = Branch.initialize('.')
174
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
175
from bzrlib.testament import Testament
176
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
177
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
178
branch.revision_store.get('A', 'sig').read())
180
def test_store_signature(self):
181
branch = Branch.initialize('.')
182
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
184
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
186
def test__relcontrolfilename(self):
187
branch = Branch.initialize('.')
188
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
190
def test__relcontrolfilename_empty(self):
191
branch = Branch.initialize('.')
192
self.assertEqual('.bzr', branch._rel_controlfilename(''))
194
def test_nicks(self):
195
"""Branch nicknames"""
197
branch = Branch.initialize('bzr.dev')
266
198
self.assertEqual(branch.nick, 'bzr.dev')
267
# Move the branch to a different directory, 'bzr.ab'. Now that branch
268
# will report its nick as 'bzr.ab'.
269
t.move('bzr.dev', 'bzr.ab')
270
branch = _mod_branch.Branch.open(self.get_url('bzr.ab'))
199
os.rename('bzr.dev', 'bzr.ab')
200
branch = Branch.open('bzr.ab')
271
201
self.assertEqual(branch.nick, 'bzr.ab')
272
# Set the branch nick explicitly. This will ensure there's a branch
273
# config file in the branch.
274
branch.nick = "Aaron's branch"
275
if not isinstance(branch, remote.RemoteBranch):
276
self.assertTrue(branch._transport.has("branch.conf"))
277
# Because the nick has been set explicitly, the nick is now always
278
# "Aaron's branch", regardless of directory name.
279
self.assertEqual(branch.nick, "Aaron's branch")
280
t.move('bzr.ab', 'integration')
281
branch = _mod_branch.Branch.open(self.get_url('integration'))
282
self.assertEqual(branch.nick, "Aaron's branch")
283
branch.nick = u"\u1234"
284
self.assertEqual(branch.nick, u"\u1234")
286
def test_nicks(self):
287
"""Test explicit and implicit branch nicknames.
289
A nickname is always available, whether set explicitly or not.
291
t = self.get_transport()
292
branch = self.make_branch('bzr.dev')
293
# An implicit nick name is set; what it is exactly depends on the
295
self.assertIsInstance(branch.nick, text_type)
296
# Set the branch nick explicitly.
297
branch.nick = u"Aaron's branch"
298
# Because the nick has been set explicitly, the nick is now always
300
self.assertEqual(branch.nick, u"Aaron's branch")
202
branch.nick = "Aaron's branch"
203
branch.nick = "Aaron's branch"
204
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
205
self.assertEqual(branch.nick, "Aaron's branch")
206
os.rename('bzr.ab', 'integration')
207
branch = Branch.open('integration')
208
self.assertEqual(branch.nick, "Aaron's branch")
301
209
branch.nick = u"\u1234"
302
210
self.assertEqual(branch.nick, u"\u1234")
304
212
def test_commit_nicks(self):
305
213
"""Nicknames are committed to the revision"""
306
wt = self.make_branch_and_tree('bzr.dev')
215
branch = Branch.initialize('bzr.dev')
308
216
branch.nick = "My happy branch"
309
wt.commit('My commit respect da nick.')
310
committed = branch.repository.get_revision(branch.last_revision())
311
if branch.repository._format.supports_storing_branch_nick:
312
self.assertEqual(committed.properties["branch-nick"],
315
self.assertNotIn("branch-nick", committed.properties)
317
def test_create_colocated(self):
319
repo = self.make_repository('.', shared=True)
320
except errors.IncompatibleFormat:
322
if repo.controldir._format.colocated_branches:
323
raise tests.TestNotApplicable(
324
"control dir does not support colocated branches")
325
self.assertEqual(0, len(repo.controldir.list_branches()))
326
if not self.bzrdir_format.colocated_branches:
327
raise tests.TestNotApplicable("control dir format does not support "
328
"colocated branches")
330
child_branch1 = self.branch_format.initialize(repo.controldir,
332
except errors.UninitializableFormat:
333
# branch references are not default init'able and
334
# not all bzrdirs support colocated branches.
336
self.assertEqual(1, len(repo.controldir.list_branches()))
337
self.branch_format.initialize(repo.controldir, name='branch2')
338
self.assertEqual(2, len(repo.controldir.list_branches()))
340
def test_create_append_revisions_only(self):
342
repo = self.make_repository('.', shared=True)
343
except errors.IncompatibleFormat:
345
for val in (True, False):
347
branch = self.branch_format.initialize(repo.controldir,
348
append_revisions_only=True)
349
except (errors.UninitializableFormat, errors.UpgradeRequired):
350
# branch references are not default init'able and
351
# not all branches support append_revisions_only
353
self.assertEqual(True, branch.get_append_revisions_only())
354
repo.controldir.destroy_branch()
356
def test_get_set_append_revisions_only(self):
357
branch = self.make_branch('.')
358
if branch._format.supports_set_append_revisions_only():
359
branch.set_append_revisions_only(True)
360
self.assertTrue(branch.get_append_revisions_only())
361
branch.set_append_revisions_only(False)
362
self.assertFalse(branch.get_append_revisions_only())
364
self.assertRaises(errors.UpgradeRequired,
365
branch.set_append_revisions_only, True)
366
self.assertFalse(branch.get_append_revisions_only())
368
def test_create_open_branch_uses_repository(self):
370
repo = self.make_repository('.', shared=True)
371
except errors.IncompatibleFormat:
372
raise tests.TestNotApplicable("requires shared repository support")
373
child_transport = repo.controldir.root_transport.clone('child')
374
child_transport.mkdir('.')
376
child_dir = self.bzrdir_format.initialize_on_transport(
378
except errors.UninitializableFormat:
379
raise tests.TestNotApplicable(
380
"control dir format not initializable")
382
child_branch = self.branch_format.initialize(child_dir)
383
except errors.UninitializableFormat:
384
# branch references are not default init'able.
386
self.assertEqual(repo.controldir.root_transport.base,
387
child_branch.repository.controldir.root_transport.base)
388
child_branch = _mod_branch.Branch.open(self.get_url('child'))
389
self.assertEqual(repo.controldir.root_transport.base,
390
child_branch.repository.controldir.root_transport.base)
392
def test_format_description(self):
393
tree = self.make_branch_and_tree('tree')
394
text = tree.branch._format.get_format_description()
395
self.assertTrue(len(text))
397
def test_get_commit_builder(self):
398
branch = self.make_branch(".")
400
builder = branch.get_commit_builder([])
401
self.assertIsInstance(builder, repository.CommitBuilder)
402
branch.repository.commit_write_group()
405
def test_generate_revision_history(self):
406
"""Create a fake revision history easily."""
407
tree = self.make_branch_and_tree('.')
408
rev1 = tree.commit('foo')
410
self.addCleanup(tree.unlock)
411
graph = tree.branch.repository.get_graph()
413
graph.iter_lefthand_ancestry(
414
tree.branch.last_revision(), [revision.NULL_REVISION]))
415
rev2 = tree.commit('bar', allow_pointless=True)
416
tree.branch.generate_revision_history(rev1)
417
self.assertEqual(orig_history, list(
418
graph.iter_lefthand_ancestry(
419
tree.branch.last_revision(), [revision.NULL_REVISION])))
421
def test_generate_revision_history_NULL_REVISION(self):
422
tree = self.make_branch_and_tree('.')
423
rev1 = tree.commit('foo')
425
self.addCleanup(tree.unlock)
426
tree.branch.generate_revision_history(revision.NULL_REVISION)
427
self.assertEqual(revision.NULL_REVISION, tree.branch.last_revision())
429
def test_create_checkout(self):
430
tree_a = self.make_branch_and_tree('a')
431
branch_a = tree_a.branch
432
checkout_b = branch_a.create_checkout('b')
433
self.assertEqual(b'null:', checkout_b.last_revision())
435
rev1 = checkout_b.commit('rev1')
436
except errors.NoRoundtrippingSupport:
437
raise tests.TestNotApplicable(
438
'roundtripping between %r and %r not supported' %
439
(checkout_b.branch, checkout_b.branch.get_master_branch()))
440
self.assertEqual(rev1, branch_a.last_revision())
441
self.assertNotEqual(checkout_b.branch.base, branch_a.base)
443
checkout_c = branch_a.create_checkout('c', lightweight=True)
444
self.assertEqual(rev1, checkout_c.last_revision())
445
rev2 = checkout_c.commit('rev2')
446
self.assertEqual(rev2, branch_a.last_revision())
447
self.assertEqual(checkout_c.branch.base, branch_a.base)
449
checkout_d = branch_a.create_checkout('d', lightweight=True)
450
self.assertEqual(rev2, checkout_d.last_revision())
451
checkout_e = branch_a.create_checkout('e')
452
self.assertEqual(rev2, checkout_e.last_revision())
454
def test_create_anonymous_lightweight_checkout(self):
455
"""A lightweight checkout from a readonly branch should succeed."""
456
tree_a = self.make_branch_and_tree('a')
457
rev_id = tree_a.commit('put some content in the branch')
458
# open the branch via a readonly transport
459
url = self.get_readonly_url(urlutils.basename(tree_a.branch.base))
460
t = transport.get_transport_from_url(url)
461
if not tree_a.branch.controldir._format.supports_transport(t):
462
raise tests.TestNotApplicable("format does not support transport")
463
source_branch = _mod_branch.Branch.open(url)
464
# sanity check that the test will be valid
465
self.assertRaises((errors.LockError, errors.TransportNotPossible),
466
source_branch.lock_write)
467
checkout = source_branch.create_checkout('c', lightweight=True)
468
self.assertEqual(rev_id, checkout.last_revision())
470
def test_create_anonymous_heavyweight_checkout(self):
471
"""A regular checkout from a readonly branch should succeed."""
472
tree_a = self.make_branch_and_tree('a')
473
rev_id = tree_a.commit('put some content in the branch')
474
# open the branch via a readonly transport
475
url = self.get_readonly_url(
476
osutils.basename(tree_a.branch.base.rstrip('/')))
477
t = transport.get_transport_from_url(url)
478
if not tree_a.branch.controldir._format.supports_transport(t):
479
raise tests.TestNotApplicable("format does not support transport")
480
source_branch = _mod_branch.Branch.open(url)
481
# sanity check that the test will be valid
482
self.assertRaises((errors.LockError, errors.TransportNotPossible),
483
source_branch.lock_write)
484
checkout = source_branch.create_checkout('c')
485
self.assertEqual(rev_id, checkout.last_revision())
487
def test_heads_to_fetch(self):
488
# heads_to_fetch is a method that returns a collection of revids that
489
# need to be fetched to copy this branch into another repo. At a
490
# minimum this will include the tip.
491
# (In native formats, this is the tip + tags, but other formats may
492
# have other revs needed)
493
tree = self.make_branch_and_tree('a')
494
tree.commit('first commit')
495
rev2 = tree.commit('second commit')
496
must_fetch, should_fetch = tree.branch.heads_to_fetch()
497
self.assertTrue(rev2 in must_fetch)
499
def test_heads_to_fetch_not_null_revision(self):
500
# NULL_REVISION does not appear in the result of heads_to_fetch, even
501
# for an empty branch.
502
tree = self.make_branch_and_tree('a')
503
must_fetch, should_fetch = tree.branch.heads_to_fetch()
504
self.assertFalse(revision.NULL_REVISION in must_fetch)
505
self.assertFalse(revision.NULL_REVISION in should_fetch)
507
def test_create_memorytree(self):
508
tree = self.make_branch_and_tree('a')
509
self.assertIsInstance(tree.branch.create_memorytree(), _mod_tree.Tree)
512
class TestBranchFormat(per_branch.TestCaseWithBranch):
514
def test_branch_format_network_name(self):
515
br = self.make_branch('.')
517
network_name = format.network_name()
518
self.assertIsInstance(network_name, bytes)
519
# We want to test that the network_name matches the actual format on
520
# disk. For local branches that means that using network_name as a key
521
# in the registry gives back the same format. For remote branches we
522
# check that the network_name of the RemoteBranchFormat we have locally
523
# matches the actual format present on disk.
524
if isinstance(format, remote.RemoteBranchFormat):
526
real_branch = br._real_branch
527
self.assertEqual(real_branch._format.network_name(), network_name)
529
registry = _mod_branch.network_format_registry
530
looked_up_format = registry.get(network_name)
531
self.assertEqual(format.__class__, looked_up_format.__class__)
533
def test_get_config_calls(self):
534
# Smoke test that all branch succeed getting a config
535
br = self.make_branch('.')
537
br.get_config_stack()
540
class ChrootedTests(per_branch.TestCaseWithBranch):
541
"""A support class that provides readonly urls outside the local namespace.
543
This is done by checking if self.transport_server is a MemoryServer. if it
544
is then we are chrooted already, if it is not then an HttpServer is used
549
super(ChrootedTests, self).setUp()
550
if not self.vfs_transport_factory == memory.MemoryServer:
551
self.transport_readonly_server = HttpServer
217
branch.working_tree().commit('My commit respect da nick.')
218
committed = branch.get_revision(branch.last_revision())
219
self.assertEqual(committed.properties["branch-nick"],
223
class TestRemote(TestCaseWithWebserver):
553
225
def test_open_containing(self):
554
self.assertRaises(errors.NotBranchError,
555
_mod_branch.Branch.open_containing,
556
self.get_readonly_url(''))
557
self.assertRaises(errors.NotBranchError,
558
_mod_branch.Branch.open_containing,
559
self.get_readonly_url('g/p/q'))
560
branch = self.make_branch('.')
561
if not branch.controldir._format.supports_transport(
562
transport.get_transport_from_url(self.get_readonly_url('.'))):
563
raise tests.TestNotApplicable("format does not support transport")
564
branch, relpath = _mod_branch.Branch.open_containing(
565
self.get_readonly_url(''))
226
self.assertRaises(NotBranchError, Branch.open_containing,
227
self.get_remote_url(''))
228
self.assertRaises(NotBranchError, Branch.open_containing,
229
self.get_remote_url('g/p/q'))
230
b = Branch.initialize('.')
231
branch, relpath = Branch.open_containing(self.get_remote_url(''))
566
232
self.assertEqual('', relpath)
567
branch, relpath = _mod_branch.Branch.open_containing(
568
self.get_readonly_url('g/p/q'))
233
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
569
234
self.assertEqual('g/p/q', relpath)
236
# TODO: rewrite this as a regular unittest, without relying on the displayed output
237
# >>> from bzrlib.commit import commit
238
# >>> bzrlib.trace.silent = True
239
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
242
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
243
# >>> br2 = ScratchBranch()
244
# >>> br2.update_revisions(br1)
246
# Added 1 inventories.
248
# >>> br2.revision_history()
250
# >>> br2.update_revisions(br1)
252
# >>> br1.text_store.total_size() == br2.text_store.total_size()
572
255
class InstrumentedTransaction(object):
586
269
def lock_read(self):
587
270
self._calls.append('lr')
588
return lock.LogicalLockResult(self.unlock)
590
272
def lock_write(self):
591
273
self._calls.append('lw')
592
return lock.LogicalLockResult(self.unlock)
594
275
def unlock(self):
595
276
self._calls.append('ul')
597
279
def do_with_read(self):
598
with self.lock_read():
601
283
def except_with_read(self):
602
with self.lock_read():
606
class TestBranchPushLocations(per_branch.TestCaseWithBranch):
287
def do_with_write(self):
291
def except_with_write(self):
295
class TestDecorators(TestCase):
297
def test_needs_read_lock(self):
298
branch = TestDecorator()
299
self.assertEqual(1, branch.do_with_read())
300
self.assertEqual(['lr', 'ul'], branch._calls)
302
def test_excepts_in_read_lock(self):
303
branch = TestDecorator()
304
self.assertRaises(RuntimeError, branch.except_with_read)
305
self.assertEqual(['lr', 'ul'], branch._calls)
307
def test_needs_write_lock(self):
308
branch = TestDecorator()
309
self.assertEqual(2, branch.do_with_write())
310
self.assertEqual(['lw', 'ul'], branch._calls)
312
def test_excepts_in_write_lock(self):
313
branch = TestDecorator()
314
self.assertRaises(RuntimeError, branch.except_with_write)
315
self.assertEqual(['lw', 'ul'], branch._calls)
318
class TestBranchTransaction(TestCaseInTempDir):
321
super(TestBranchTransaction, self).setUp()
322
self.branch = Branch.initialize('.')
324
def test_default_get_transaction(self):
325
"""branch.get_transaction on a new branch should give a PassThrough."""
326
self.failUnless(isinstance(self.branch.get_transaction(),
327
transactions.PassThroughTransaction))
329
def test__set_new_transaction(self):
330
self.branch._set_transaction(transactions.ReadOnlyTransaction())
332
def test__set_over_existing_transaction_raises(self):
333
self.branch._set_transaction(transactions.ReadOnlyTransaction())
334
self.assertRaises(errors.LockError,
335
self.branch._set_transaction,
336
transactions.ReadOnlyTransaction())
338
def test_finish_no_transaction_raises(self):
339
self.assertRaises(errors.LockError, self.branch._finish_transaction)
341
def test_finish_readonly_transaction_works(self):
342
self.branch._set_transaction(transactions.ReadOnlyTransaction())
343
self.branch._finish_transaction()
344
self.assertEqual(None, self.branch._transaction)
346
def test_unlock_calls_finish(self):
347
self.branch.lock_read()
348
transaction = InstrumentedTransaction()
349
self.branch._transaction = transaction
351
self.assertEqual(['finish'], transaction.calls)
353
def test_lock_read_acquires_ro_transaction(self):
354
self.branch.lock_read()
355
self.failUnless(isinstance(self.branch.get_transaction(),
356
transactions.ReadOnlyTransaction))
359
def test_lock_write_acquires_passthrough_transaction(self):
360
self.branch.lock_write()
361
# cannot use get_transaction as its magic
362
self.failUnless(isinstance(self.branch._transaction,
363
transactions.PassThroughTransaction))
367
class TestBranchPushLocations(TestCaseInTempDir):
370
super(TestBranchPushLocations, self).setUp()
371
self.branch = Branch.initialize('.')
608
373
def test_get_push_location_unset(self):
609
self.assertEqual(None, self.get_branch().get_push_location())
374
self.assertEqual(None, self.branch.get_push_location())
611
376
def test_get_push_location_exact(self):
612
b = self.get_branch()
613
config.LocationConfig.from_string(
614
'[%s]\npush_location=foo\n' % (b.base,), b.base, save=True)
615
self.assertEqual("foo", self.get_branch().get_push_location())
377
self.build_tree(['.bazaar/'])
378
print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
379
"push_location=foo" %
381
self.assertEqual("foo", self.branch.get_push_location())
617
383
def test_set_push_location(self):
618
branch = self.get_branch()
619
branch.set_push_location('foo')
620
self.assertEqual('foo', branch.get_push_location())
623
class TestChildSubmitFormats(per_branch.TestCaseWithBranch):
625
def test_get_child_submit_format_default(self):
626
submit_format = self.get_branch().get_child_submit_format()
627
self.assertTrue(submit_format is None or
628
isinstance(submit_format, str))
630
def test_get_child_submit_format(self):
631
branch = self.get_branch()
632
branch.get_config_stack().set('child_submit_format', '10')
633
branch = self.get_branch()
634
self.assertEqual('10', branch.get_child_submit_format())
637
class TestFormat(per_branch.TestCaseWithBranch):
638
"""Tests for the format itself."""
640
def test_get_reference(self):
641
"""get_reference on all regular branches should return None."""
642
if not self.branch_format.is_supported():
643
# unsupported formats are not loopback testable
644
# because the default open will not open them and
645
# they may not be initializable.
647
made_controldir = self.make_controldir('.')
648
made_controldir.create_repository()
649
if made_controldir._format.colocated_branches:
650
# Formats that support colocated branches sometimes have a default
651
# branch that is a reference branch (such as Git). Cope with
652
# those by creating a different colocated branch.
657
made_branch = made_controldir.create_branch(name)
658
except errors.UninitializableFormat:
659
raise tests.TestNotApplicable('Uninitializable branch format')
661
self.assertEqual(None,
662
made_branch._format.get_reference(made_branch.controldir, name))
664
def test_set_reference(self):
665
"""set_reference on all regular branches should be callable."""
666
if not self.branch_format.is_supported():
667
# unsupported formats are not loopback testable
668
# because the default open will not open them and
669
# they may not be initializable.
671
this_branch = self.make_branch('this')
672
other_branch = self.make_branch('other')
674
this_branch._format.set_reference(this_branch.controldir, None,
676
except (NotImplementedError, errors.IncompatibleFormat):
680
ref = this_branch._format.get_reference(this_branch.controldir)
681
self.assertEqual(ref, other_branch.user_url)
683
def test_format_initialize_find_open(self):
684
# loopback test to check the current format initializes to itself.
685
if not self.branch_format.is_supported():
686
# unsupported formats are not loopback testable
687
# because the default open will not open them and
688
# they may not be initializable.
690
# supported formats must be able to init and open
691
t = self.get_transport()
692
readonly_t = transport.get_transport_from_url(self.get_readonly_url())
693
made_branch = self.make_branch('.')
694
self.assertIsInstance(made_branch, _mod_branch.Branch)
696
# find it via bzrdir opening:
697
opened_control = controldir.ControlDir.open(readonly_t.base)
698
direct_opened_branch = opened_control.open_branch()
699
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
700
self.assertEqual(opened_control, direct_opened_branch.controldir)
701
self.assertIsInstance(direct_opened_branch._format,
702
self.branch_format.__class__)
704
# find it via Branch.open
705
opened_branch = _mod_branch.Branch.open(readonly_t.base)
706
self.assertIsInstance(opened_branch, made_branch.__class__)
707
self.assertEqual(made_branch._format.__class__,
708
opened_branch._format.__class__)
709
# if it has a unique id string, can we probe for it ?
711
self.branch_format.get_format_string()
712
except NotImplementedError:
714
self.assertEqual(self.branch_format,
715
opened_control.find_branch_format())
718
class TestBound(per_branch.TestCaseWithBranch):
720
def test_bind_unbind(self):
721
branch = self.make_branch('1')
722
branch2 = self.make_branch('2')
725
except errors.UpgradeRequired:
726
raise tests.TestNotApplicable('Format does not support binding')
727
self.assertTrue(branch.unbind())
728
self.assertFalse(branch.unbind())
729
self.assertIs(None, branch.get_bound_location())
731
def test_old_bound_location(self):
732
branch = self.make_branch('branch1')
734
self.assertIs(None, branch.get_old_bound_location())
735
except errors.UpgradeRequired:
736
raise tests.TestNotApplicable(
737
'Format does not store old bound locations')
738
branch2 = self.make_branch('branch2')
740
self.assertIs(None, branch.get_old_bound_location())
742
self.assertContainsRe(
743
branch.get_old_bound_location(), '\\/branch2\\/$')
745
def test_bind_diverged(self):
746
tree_a = self.make_branch_and_tree('tree_a')
747
tree_a.commit('rev1a')
748
tree_b = tree_a.controldir.sprout('tree_b').open_workingtree()
749
tree_a.commit('rev2a')
750
tree_b.commit('rev2b')
752
tree_b.branch.bind(tree_a.branch)
753
except errors.UpgradeRequired:
754
raise tests.TestNotApplicable('Format does not support binding')
756
def test_unbind_clears_cached_master_branch(self):
757
"""b.unbind clears any cached value of b.get_master_branch."""
758
master = self.make_branch('master')
759
branch = self.make_branch('branch')
762
except errors.UpgradeRequired:
763
raise tests.TestNotApplicable('Format does not support binding')
764
self.addCleanup(branch.lock_write().unlock)
765
self.assertNotEqual(None, branch.get_master_branch())
767
self.assertEqual(None, branch.get_master_branch())
769
def test_bind_clears_cached_master_branch(self):
770
"""b.bind clears any cached value of b.get_master_branch."""
771
master1 = self.make_branch('master1')
772
master2 = self.make_branch('master2')
773
branch = self.make_branch('branch')
776
except errors.UpgradeRequired:
777
raise tests.TestNotApplicable('Format does not support binding')
778
self.addCleanup(branch.lock_write().unlock)
779
self.assertNotEqual(None, branch.get_master_branch())
781
self.assertEqual('.', urlutils.relative_url(self.get_url('master2'),
782
branch.get_master_branch().base))
784
def test_set_bound_location_clears_cached_master_branch(self):
785
"""b.set_bound_location clears any cached value of b.get_master_branch.
787
master1 = self.make_branch('master1')
788
master2 = self.make_branch('master2')
789
branch = self.make_branch('branch')
792
except errors.UpgradeRequired:
793
raise tests.TestNotApplicable('Format does not support binding')
794
self.addCleanup(branch.lock_write().unlock)
795
self.assertNotEqual(None, branch.get_master_branch())
796
branch.set_bound_location(self.get_url('master2'))
797
self.assertEqual('.', urlutils.relative_url(self.get_url('master2'),
798
branch.get_master_branch().base))
801
class TestStrict(per_branch.TestCaseWithBranch):
803
def test_strict_history(self):
804
tree1 = self.make_branch_and_tree('tree1')
806
tree1.branch.set_append_revisions_only(True)
807
except errors.UpgradeRequired:
808
raise tests.TestSkipped('Format does not support strict history')
809
tree1.commit('empty commit')
810
tree2 = tree1.controldir.sprout('tree2').open_workingtree()
811
tree2.commit('empty commit 2')
812
tree1.pull(tree2.branch)
813
tree1.commit('empty commit 3')
814
tree2.commit('empty commit 4')
815
self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
816
tree2.merge_from_branch(tree1.branch)
817
tree2.commit('empty commit 5')
818
self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
820
tree3 = tree1.controldir.sprout('tree3').open_workingtree()
821
tree3.merge_from_branch(tree2.branch)
822
tree3.commit('empty commit 6')
823
tree2.pull(tree3.branch)
826
class TestIgnoreFallbacksParameter(per_branch.TestCaseWithBranch):
828
def make_branch_with_fallback(self):
829
fallback = self.make_branch('fallback')
830
if not fallback._format.supports_stacking():
831
raise tests.TestNotApplicable("format does not support stacking")
832
stacked = self.make_branch('stacked')
833
stacked.set_stacked_on_url(fallback.base)
836
def test_fallbacks_not_opened(self):
837
stacked = self.make_branch_with_fallback()
838
self.get_transport('').rename('fallback', 'moved')
839
reopened_dir = controldir.ControlDir.open(stacked.base)
840
reopened = reopened_dir.open_branch(ignore_fallbacks=True)
841
self.assertEqual([], reopened.repository._fallback_repositories)
843
def test_fallbacks_are_opened(self):
844
stacked = self.make_branch_with_fallback()
845
reopened_dir = controldir.ControlDir.open(stacked.base)
846
reopened = reopened_dir.open_branch(ignore_fallbacks=False)
847
self.assertLength(1, reopened.repository._fallback_repositories)
850
class TestBranchControlComponent(per_branch.TestCaseWithBranch):
851
"""Branch implementations adequately implement ControlComponent."""
854
br = self.make_branch('branch')
855
self.assertIsInstance(br.user_url, str)
856
self.assertEqual(br.user_url, br.user_transport.base)
857
self.assertEqual(br.control_url, br.control_transport.base)
860
class FakeShelfCreator(object):
862
def __init__(self, branch):
865
def write_shelf(self, shelf_file, message=None):
866
tree = self.branch.repository.revision_tree(revision.NULL_REVISION)
867
with transform.TransformPreview(tree) as tt:
868
shelf.ShelfCreator._write_shelf(
869
shelf_file, tt, revision.NULL_REVISION)
872
@contextlib.contextmanager
873
def skip_if_storing_uncommitted_unsupported():
876
except errors.StoringUncommittedNotSupported:
877
raise tests.TestNotApplicable('Cannot store uncommitted changes.')
880
class TestUncommittedChanges(per_branch.TestCaseWithBranch):
883
super(TestUncommittedChanges, self).setUp()
884
if not self.branch_format.supports_store_uncommitted():
885
raise tests.TestNotApplicable(
886
'Branch format does not support store_uncommitted')
888
def bind(self, branch, master):
891
except errors.UpgradeRequired:
892
raise tests.TestNotApplicable('Branch cannot be bound.')
894
def test_store_uncommitted(self):
895
tree = self.make_branch_and_tree('b')
897
creator = FakeShelfCreator(branch)
898
with skip_if_storing_uncommitted_unsupported():
899
self.assertIs(None, branch.get_unshelver(tree))
900
branch.store_uncommitted(creator)
901
self.assertIsNot(None, branch.get_unshelver(tree))
903
def test_store_uncommitted_bound(self):
904
tree = self.make_branch_and_tree('b')
906
master = self.make_branch('master')
907
self.bind(branch, master)
908
creator = FakeShelfCreator(tree.branch)
909
self.assertIs(None, tree.branch.get_unshelver(tree))
910
self.assertIs(None, master.get_unshelver(tree))
911
tree.branch.store_uncommitted(creator)
912
self.assertIsNot(None, master.get_unshelver(tree))
914
def test_store_uncommitted_already_stored(self):
915
branch = self.make_branch('b')
916
with skip_if_storing_uncommitted_unsupported():
917
branch.store_uncommitted(FakeShelfCreator(branch))
918
self.assertRaises(errors.ChangesAlreadyStored,
919
branch.store_uncommitted, FakeShelfCreator(branch))
921
def test_store_uncommitted_none(self):
922
branch = self.make_branch('b')
923
with skip_if_storing_uncommitted_unsupported():
924
branch.store_uncommitted(FakeShelfCreator(branch))
925
branch.store_uncommitted(None)
926
self.assertIs(None, branch.get_unshelver(None))
928
def test_get_unshelver(self):
929
tree = self.make_branch_and_tree('tree')
931
self.build_tree_contents([('tree/file', b'contents1')])
933
with skip_if_storing_uncommitted_unsupported():
934
tree.store_uncommitted()
935
unshelver = tree.branch.get_unshelver(tree)
936
self.assertIsNot(None, unshelver)
938
def test_get_unshelver_bound(self):
939
tree = self.make_branch_and_tree('tree')
941
self.build_tree_contents([('tree/file', b'contents1')])
943
with skip_if_storing_uncommitted_unsupported():
944
tree.store_uncommitted()
945
branch = self.make_branch('branch')
946
self.bind(branch, tree.branch)
947
unshelver = branch.get_unshelver(tree)
948
self.assertIsNot(None, unshelver)
951
class TestFormatMetadata(per_branch.TestCaseWithBranch):
953
def test_stores_revno(self):
954
self.assertIn(self.branch_format.stores_revno(), (True, False))
384
self.branch.set_push_location('foo')
385
self.assertFileEqual("[%s]\n"
386
"push_location = foo" % os.getcwdu(),
387
'.bazaar/branches.conf')
389
# TODO RBC 20051029 test getting a push location from a branch in a
390
# recursive section - that is, it appends the branch name.