14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
23
import bzrlib.bzrdir as bzrdir
19
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
from bzrlib.clone import copy_branch
21
25
from bzrlib.commit import commit
22
26
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
27
from bzrlib.errors import (FileExists,
30
UninitializableFormat,
25
from bzrlib.tests import TestCase, TestCaseInTempDir
26
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
34
from bzrlib.osutils import getcwd
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
27
36
from bzrlib.trace import mutter
28
37
import bzrlib.transactions as transactions
29
from bzrlib.revision import NULL_REVISION
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
from bzrlib.transport.memory import MemoryServer
41
from bzrlib.upgrade import upgrade
42
from bzrlib.workingtree import WorkingTree
31
45
# TODO: Make a branch using basis branch, and check that it
32
46
# doesn't request any files that could have been avoided, by
33
47
# hooking into the Transport.
35
class TestBranch(TestCaseInTempDir):
50
class TestCaseWithBranch(TestCaseWithTransport):
53
super(TestCaseWithBranch, self).setUp()
57
if self.branch is None:
58
self.branch = self.make_branch(None)
61
def make_branch(self, relpath):
62
repo = self.make_repository(relpath)
63
# fixme RBC 20060210 this isnt necessarily a fixable thing,
64
# Skipped is the wrong exception to raise.
66
return self.branch_format.initialize(repo.bzrdir)
67
except errors.UninitializableFormat:
68
raise TestSkipped('Uninitializable branch format')
70
def make_repository(self, relpath, shared=False):
72
url = self.get_url(relpath)
73
segments = url.split('/')
74
if segments and segments[-1] not in ('', '.'):
75
parent = '/'.join(segments[:-1])
76
t = get_transport(parent)
81
made_control = self.bzrdir_format.initialize(url)
82
return made_control.create_repository(shared=shared)
83
except UninitializableFormat:
84
raise TestSkipped("Format %s is not initializable.")
87
class TestBranch(TestCaseWithBranch):
37
89
def test_append_revisions(self):
38
90
"""Test appending more than one revision"""
39
br = Branch.initialize(u".")
91
br = self.get_branch()
40
92
br.append_revision("rev1")
41
93
self.assertEquals(br.revision_history(), ["rev1",])
42
94
br.append_revision("rev2", "rev3")
45
97
def test_fetch_revisions(self):
46
98
"""Test fetch-revision operation."""
47
from bzrlib.fetch import Fetcher
50
b1 = Branch.initialize('b1')
51
b2 = Branch.initialize('b2')
52
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
53
b1.working_tree().add(['foo'], ['foo-id'])
54
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
99
get_transport(self.get_url()).mkdir('b1')
100
get_transport(self.get_url()).mkdir('b2')
101
wt = self.make_branch_and_tree('b1')
103
b2 = self.make_branch('b2')
104
file('b1/foo', 'w').write('hello')
105
wt.add(['foo'], ['foo-id'])
106
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
56
108
mutter('start fetch')
57
f = Fetcher(from_branch=b1, to_branch=b2)
58
eq = self.assertEquals
60
eq(f.last_revision, 'revision-1')
62
rev = b2.get_revision('revision-1')
63
tree = b2.revision_tree('revision-1')
64
eq(tree.get_file_text('foo-id'), 'hello')
66
def test_revision_tree(self):
67
b1 = Branch.initialize(u'.')
68
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
69
tree = b1.revision_tree('revision-1')
70
tree = b1.revision_tree(None)
71
self.assertEqual(len(tree.list_files()), 0)
72
tree = b1.revision_tree(NULL_REVISION)
73
self.assertEqual(len(tree.list_files()), 0)
75
def get_unbalanced_branch_pair(self):
109
self.assertEqual((1, []), b2.fetch(b1))
111
rev = b2.repository.get_revision('revision-1')
112
tree = b2.repository.revision_tree('revision-1')
113
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
115
def get_unbalanced_tree_pair(self):
76
116
"""Return two branches, a and b, with one file in a."""
78
br_a = Branch.initialize("a")
117
get_transport(self.get_url()).mkdir('a')
118
tree_a = self.make_branch_and_tree('a')
79
119
file('a/b', 'wb').write('b')
80
br_a.working_tree().add('b')
81
commit(br_a, "silly commit", rev_id='A')
83
br_b = Branch.initialize("b")
121
tree_a.commit("silly commit", rev_id='A')
123
get_transport(self.get_url()).mkdir('b')
124
tree_b = self.make_branch_and_tree('b')
125
return tree_a, tree_b
86
127
def get_balanced_branch_pair(self):
87
128
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
88
br_a, br_b = self.get_unbalanced_branch_pair()
89
br_a.push_stores(br_b)
92
def test_push_stores(self):
93
"""Copy the stores from one branch to another"""
94
br_a, br_b = self.get_unbalanced_branch_pair()
95
# ensure the revision is missing.
96
self.assertRaises(NoSuchRevision, br_b.get_revision,
97
br_a.revision_history()[0])
98
br_a.push_stores(br_b)
99
# check that b now has all the data from a's first commit.
100
rev = br_b.get_revision(br_a.revision_history()[0])
101
tree = br_b.revision_tree(br_a.revision_history()[0])
103
if tree.inventory[file_id].kind == "file":
104
tree.get_file(file_id).read()
107
def test_copy_branch(self):
108
"""Copy the stores from one branch to another"""
109
br_a, br_b = self.get_balanced_branch_pair()
110
commit(br_b, "silly commit")
129
tree_a, tree_b = self.get_unbalanced_tree_pair()
130
tree_b.branch.repository.fetch(tree_a.branch.repository)
131
return tree_a, tree_b
133
def test_clone_branch(self):
134
"""Copy the stores from one branch to another"""
135
tree_a, tree_b = self.get_balanced_branch_pair()
136
tree_b.commit("silly commit")
112
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
113
self.assertEqual(br_a.revision_history(), br_c.revision_history())
138
# this fails to test that the history from a was not used.
139
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
140
self.assertEqual(tree_a.branch.revision_history(),
141
dir_c.open_branch().revision_history())
115
def test_copy_partial(self):
143
def test_clone_partial(self):
116
144
"""Copy only part of the history of a branch."""
117
self.build_tree(['a/', 'a/one'])
118
br_a = Branch.initialize('a')
119
br_a.working_tree().add(['one'])
120
br_a.working_tree().commit('commit one', rev_id='u@d-1')
121
self.build_tree(['a/two'])
122
br_a.working_tree().add(['two'])
123
br_a.working_tree().commit('commit two', rev_id='u@d-2')
124
br_b = copy_branch(br_a, 'b', revision='u@d-1')
125
self.assertEqual(br_b.last_revision(), 'u@d-1')
126
self.assertTrue(os.path.exists('b/one'))
127
self.assertFalse(os.path.exists('b/two'))
145
# TODO: RBC 20060208 test with a revision not on revision-history.
146
# what should that behaviour be ? Emailed the list.
147
wt_a = self.make_branch_and_tree('a')
148
self.build_tree(['a/one'])
150
wt_a.commit('commit one', rev_id='1')
151
self.build_tree(['a/two'])
153
wt_a.commit('commit two', rev_id='2')
154
repo_b = self.make_repository('b')
155
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
156
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
157
self.assertEqual(br_b.last_revision(), '1')
159
def test_sprout_partial(self):
160
# test sprouting with a prefix of the revision-history.
161
# also needs not-on-revision-history behaviour defined.
162
wt_a = self.make_branch_and_tree('a')
163
self.build_tree(['a/one'])
165
wt_a.commit('commit one', rev_id='1')
166
self.build_tree(['a/two'])
168
wt_a.commit('commit two', rev_id='2')
169
repo_b = self.make_repository('b')
170
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
171
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
172
self.assertEqual(br_b.last_revision(), '1')
174
def test_clone_branch_nickname(self):
175
# test the nick name is preserved always
176
raise TestSkipped('XXX branch cloning is not yet tested..')
178
def test_clone_branch_parent(self):
179
# test the parent is preserved always
180
raise TestSkipped('XXX branch cloning is not yet tested..')
182
def test_sprout_branch_nickname(self):
183
# test the nick name is reset always
184
raise TestSkipped('XXX branch sprouting is not yet tested..')
186
def test_sprout_branch_parent(self):
187
source = self.make_branch('source')
188
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
189
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
129
191
def test_record_initial_ghost_merge(self):
130
192
"""A pending merge with no revision present is still a merge."""
131
branch = Branch.initialize(u'.')
132
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
133
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
134
rev = branch.get_revision(branch.last_revision())
193
wt = self.make_branch_and_tree('.')
195
wt.add_pending_merge('non:existent@rev--ision--0--2')
196
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
197
rev = branch.repository.get_revision(branch.last_revision())
135
198
self.assertEqual(len(rev.parent_ids), 1)
136
199
# parent_sha1s is not populated now, WTF. rbc 20051003
137
200
self.assertEqual(len(rev.parent_sha1s), 0)
138
201
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
140
203
def test_bad_revision(self):
141
branch = Branch.initialize(u'.')
142
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
204
self.assertRaises(errors.InvalidRevisionId,
205
self.get_branch().repository.get_revision,
144
208
# TODO 20051003 RBC:
145
209
# compare the gpg-to-sign info for a commit with a ghost and
170
234
self.assertEquals(wt.pending_merges(), [])
172
236
def test_sign_existing_revision(self):
173
branch = Branch.initialize(u'.')
174
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
237
wt = self.make_branch_and_tree('.')
239
wt.commit("base", allow_pointless=True, rev_id='A')
175
240
from bzrlib.testament import Testament
176
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
177
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
178
branch.revision_store.get('A', 'sig').read())
241
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
242
branch.repository.sign_revision('A', strategy)
243
self.assertEqual(Testament.from_revision(branch.repository,
244
'A').as_short_text(),
245
branch.repository.get_signature_text('A'))
180
247
def test_store_signature(self):
181
branch = Branch.initialize(u'.')
182
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
184
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
248
wt = self.make_branch_and_tree('.')
250
branch.repository.store_revision_signature(
251
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
252
self.assertRaises(errors.NoSuchRevision,
253
branch.repository.has_signature_for_revision_id,
255
wt.commit("base", allow_pointless=True, rev_id='A')
256
self.assertEqual('FOO',
257
branch.repository.get_signature_text('A'))
186
def test__relcontrolfilename(self):
187
branch = Branch.initialize(u'.')
188
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
190
def test__relcontrolfilename_empty(self):
191
branch = Branch.initialize(u'.')
192
self.assertEqual('.bzr', branch._rel_controlfilename(''))
259
def test_branch_keeps_signatures(self):
260
wt = self.make_branch_and_tree('source')
261
wt.commit('A', allow_pointless=True, rev_id='A')
262
wt.branch.repository.sign_revision('A',
263
bzrlib.gpg.LoopbackGPGStrategy(None))
264
#FIXME: clone should work to urls,
265
# wt.clone should work to disks.
266
self.build_tree(['target/'])
267
d2 = wt.bzrdir.clone('target')
268
self.assertEqual(wt.branch.repository.get_signature_text('A'),
269
d2.open_repository().get_signature_text('A'))
194
271
def test_nicks(self):
195
272
"""Branch nicknames"""
197
branch = Branch.initialize('bzr.dev')
273
t = get_transport(self.get_url())
275
branch = self.make_branch('bzr.dev')
198
276
self.assertEqual(branch.nick, 'bzr.dev')
199
os.rename('bzr.dev', 'bzr.ab')
200
branch = Branch.open('bzr.ab')
277
t.move('bzr.dev', 'bzr.ab')
278
branch = Branch.open(self.get_url('bzr.ab'))
201
279
self.assertEqual(branch.nick, 'bzr.ab')
202
280
branch.nick = "Aaron's branch"
203
281
branch.nick = "Aaron's branch"
204
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
285
branch.control_files.controlfilename("branch.conf")
205
289
self.assertEqual(branch.nick, "Aaron's branch")
206
os.rename('bzr.ab', 'integration')
207
branch = Branch.open('integration')
290
t.move('bzr.ab', 'integration')
291
branch = Branch.open(self.get_url('integration'))
208
292
self.assertEqual(branch.nick, "Aaron's branch")
209
293
branch.nick = u"\u1234"
210
294
self.assertEqual(branch.nick, u"\u1234")
212
296
def test_commit_nicks(self):
213
297
"""Nicknames are committed to the revision"""
215
branch = Branch.initialize('bzr.dev')
298
get_transport(self.get_url()).mkdir('bzr.dev')
299
wt = self.make_branch_and_tree('bzr.dev')
216
301
branch.nick = "My happy branch"
217
branch.working_tree().commit('My commit respect da nick.')
218
committed = branch.get_revision(branch.last_revision())
302
wt.commit('My commit respect da nick.')
303
committed = branch.repository.get_revision(branch.last_revision())
219
304
self.assertEqual(committed.properties["branch-nick"],
220
305
"My happy branch")
223
class TestRemote(TestCaseWithWebserver):
307
def test_create_open_branch_uses_repository(self):
309
repo = self.make_repository('.', shared=True)
310
except errors.IncompatibleFormat:
312
repo.bzrdir.root_transport.mkdir('child')
313
child_dir = self.bzrdir_format.initialize('child')
315
child_branch = self.branch_format.initialize(child_dir)
316
except errors.UninitializableFormat:
317
# branch references are not default init'able.
319
self.assertEqual(repo.bzrdir.root_transport.base,
320
child_branch.repository.bzrdir.root_transport.base)
321
child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
322
self.assertEqual(repo.bzrdir.root_transport.base,
323
child_branch.repository.bzrdir.root_transport.base)
326
class ChrootedTests(TestCaseWithBranch):
327
"""A support class that provides readonly urls outside the local namespace.
329
This is done by checking if self.transport_server is a MemoryServer. if it
330
is then we are chrooted already, if it is not then an HttpServer is used
335
super(ChrootedTests, self).setUp()
336
if not self.transport_server == MemoryServer:
337
self.transport_readonly_server = HttpServer
225
339
def test_open_containing(self):
226
340
self.assertRaises(NotBranchError, Branch.open_containing,
227
self.get_remote_url(''))
341
self.get_readonly_url(''))
228
342
self.assertRaises(NotBranchError, Branch.open_containing,
229
self.get_remote_url('g/p/q'))
230
b = Branch.initialize(u'.')
231
branch, relpath = Branch.open_containing(self.get_remote_url(''))
343
self.get_readonly_url('g/p/q'))
344
branch = self.make_branch('.')
345
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
232
346
self.assertEqual('', relpath)
233
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
347
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
234
348
self.assertEqual('g/p/q', relpath)
236
350
# TODO: rewrite this as a regular unittest, without relying on the displayed output
315
429
self.assertEqual(['lw', 'ul'], branch._calls)
318
class TestBranchTransaction(TestCaseInTempDir):
432
class TestBranchTransaction(TestCaseWithBranch):
321
435
super(TestBranchTransaction, self).setUp()
322
self.branch = Branch.initialize(u'.')
324
438
def test_default_get_transaction(self):
325
439
"""branch.get_transaction on a new branch should give a PassThrough."""
326
self.failUnless(isinstance(self.branch.get_transaction(),
440
self.failUnless(isinstance(self.get_branch().get_transaction(),
327
441
transactions.PassThroughTransaction))
329
443
def test__set_new_transaction(self):
330
self.branch._set_transaction(transactions.ReadOnlyTransaction())
444
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
332
446
def test__set_over_existing_transaction_raises(self):
333
self.branch._set_transaction(transactions.ReadOnlyTransaction())
447
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
334
448
self.assertRaises(errors.LockError,
335
self.branch._set_transaction,
449
self.get_branch()._set_transaction,
336
450
transactions.ReadOnlyTransaction())
338
452
def test_finish_no_transaction_raises(self):
339
self.assertRaises(errors.LockError, self.branch._finish_transaction)
453
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
341
455
def test_finish_readonly_transaction_works(self):
342
self.branch._set_transaction(transactions.ReadOnlyTransaction())
343
self.branch._finish_transaction()
344
self.assertEqual(None, self.branch._transaction)
456
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
457
self.get_branch()._finish_transaction()
458
self.assertEqual(None, self.get_branch().control_files._transaction)
346
460
def test_unlock_calls_finish(self):
347
self.branch.lock_read()
461
self.get_branch().lock_read()
348
462
transaction = InstrumentedTransaction()
349
self.branch._transaction = transaction
463
self.get_branch().control_files._transaction = transaction
464
self.get_branch().unlock()
351
465
self.assertEqual(['finish'], transaction.calls)
353
467
def test_lock_read_acquires_ro_transaction(self):
354
self.branch.lock_read()
355
self.failUnless(isinstance(self.branch.get_transaction(),
468
self.get_branch().lock_read()
469
self.failUnless(isinstance(self.get_branch().get_transaction(),
356
470
transactions.ReadOnlyTransaction))
471
self.get_branch().unlock()
359
def test_lock_write_acquires_passthrough_transaction(self):
360
self.branch.lock_write()
473
def test_lock_write_acquires_write_transaction(self):
474
self.get_branch().lock_write()
361
475
# cannot use get_transaction as its magic
362
self.failUnless(isinstance(self.branch._transaction,
363
transactions.PassThroughTransaction))
367
class TestBranchPushLocations(TestCaseInTempDir):
370
super(TestBranchPushLocations, self).setUp()
371
self.branch = Branch.initialize(u'.')
476
self.failUnless(isinstance(self.get_branch().control_files._transaction,
477
transactions.WriteTransaction))
478
self.get_branch().unlock()
481
class TestBranchPushLocations(TestCaseWithBranch):
373
483
def test_get_push_location_unset(self):
374
self.assertEqual(None, self.branch.get_push_location())
484
self.assertEqual(None, self.get_branch().get_push_location())
376
486
def test_get_push_location_exact(self):
377
self.build_tree(['.bazaar/'])
378
print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
379
"push_location=foo" %
381
self.assertEqual("foo", self.branch.get_push_location())
487
from bzrlib.config import (branches_config_filename,
488
ensure_config_dir_exists)
489
ensure_config_dir_exists()
490
fn = branches_config_filename()
491
print >> open(fn, 'wt'), ("[%s]\n"
492
"push_location=foo" %
493
self.get_branch().base[:-1])
494
self.assertEqual("foo", self.get_branch().get_push_location())
383
496
def test_set_push_location(self):
384
self.branch.set_push_location('foo')
497
from bzrlib.config import (branches_config_filename,
498
ensure_config_dir_exists)
499
ensure_config_dir_exists()
500
fn = branches_config_filename()
501
self.get_branch().set_push_location('foo')
385
502
self.assertFileEqual("[%s]\n"
386
"push_location = foo" % os.getcwdu(),
387
'.bazaar/branches.conf')
503
"push_location = foo" % self.get_branch().base[:-1],
389
506
# TODO RBC 20051029 test getting a push location from a branch in a
390
507
# recursive section - that is, it appends the branch name.
510
class TestFormat(TestCaseWithBranch):
511
"""Tests for the format itself."""
513
def test_format_initialize_find_open(self):
514
# loopback test to check the current format initializes to itself.
515
if not self.branch_format.is_supported():
516
# unsupported formats are not loopback testable
517
# because the default open will not open them and
518
# they may not be initializable.
520
# supported formats must be able to init and open
521
t = get_transport(self.get_url())
522
readonly_t = get_transport(self.get_readonly_url())
523
made_branch = self.make_branch('.')
524
self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
526
# find it via bzrdir opening:
527
opened_control = bzrdir.BzrDir.open(readonly_t.base)
528
direct_opened_branch = opened_control.open_branch()
529
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
530
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
531
self.failUnless(isinstance(direct_opened_branch._format,
532
self.branch_format.__class__))
534
# find it via Branch.open
535
opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
536
self.failUnless(isinstance(opened_branch, made_branch.__class__))
537
self.assertEqual(made_branch._format.__class__,
538
opened_branch._format.__class__)
539
# if it has a unique id string, can we probe for it ?
541
self.branch_format.get_format_string()
542
except NotImplementedError:
544
self.assertEqual(self.branch_format,
545
bzrlib.branch.BranchFormat.find_format(opened_control))