14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
22
import bzrlib.branch as branch
20
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
21
from bzrlib.clone import copy_branch
22
24
from bzrlib.commit import commit
23
25
import bzrlib.errors as errors
24
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
26
from bzrlib.errors import (FileExists,
29
UninitializableFormat,
26
33
from bzrlib.osutils import getcwd
27
from bzrlib.tests import TestCase, TestCaseInTempDir
28
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
34
from bzrlib.revision import NULL_REVISION
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
29
36
from bzrlib.trace import mutter
30
37
import bzrlib.transactions as transactions
31
from bzrlib.revision import NULL_REVISION
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
from bzrlib.transport.memory import MemoryServer
41
from bzrlib.upgrade import upgrade
42
from bzrlib.workingtree import WorkingTree
33
44
# TODO: Make a branch using basis branch, and check that it
34
45
# doesn't request any files that could have been avoided, by
35
46
# hooking into the Transport.
37
class TestBranch(TestCaseInTempDir):
49
class TestCaseWithBranch(TestCaseWithTransport):
52
super(TestCaseWithBranch, self).setUp()
56
if self.branch is None:
57
self.branch = self.make_branch(None)
60
def make_branch(self, relpath):
62
url = self.get_url(relpath)
63
segments = url.split('/')
64
if segments and segments[-1] not in ('', '.'):
65
parent = '/'.join(segments[:-1])
66
t = get_transport(parent)
71
return self.branch_format.initialize(url)
72
except UninitializableFormat:
73
raise TestSkipped("Format %s is not initializable.")
76
class TestBranch(TestCaseWithBranch):
39
78
def test_append_revisions(self):
40
79
"""Test appending more than one revision"""
41
br = Branch.initialize(u".")
80
br = self.get_branch()
42
81
br.append_revision("rev1")
43
82
self.assertEquals(br.revision_history(), ["rev1",])
44
83
br.append_revision("rev2", "rev3")
47
86
def test_fetch_revisions(self):
48
87
"""Test fetch-revision operation."""
49
88
from bzrlib.fetch import Fetcher
52
b1 = Branch.initialize('b1')
53
b2 = Branch.initialize('b2')
89
get_transport(self.get_url()).mkdir('b1')
90
get_transport(self.get_url()).mkdir('b2')
91
b1 = self.make_branch('b1')
92
b2 = self.make_branch('b2')
93
wt = WorkingTree.create(b1, 'b1')
54
94
file('b1/foo', 'w').write('hello')
55
b1.working_tree().add(['foo'], ['foo-id'])
56
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
95
wt.add(['foo'], ['foo-id'])
96
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
58
98
mutter('start fetch')
59
99
f = Fetcher(from_branch=b1, to_branch=b2)
60
100
eq = self.assertEquals
61
101
eq(f.count_copied, 1)
62
eq(f.last_revision, 'revision-1')
102
eq(f._last_revision, 'revision-1')
64
rev = b2.get_revision('revision-1')
65
tree = b2.revision_tree('revision-1')
104
rev = b2.repository.get_revision('revision-1')
105
tree = b2.repository.revision_tree('revision-1')
66
106
eq(tree.get_file_text('foo-id'), 'hello')
68
108
def test_revision_tree(self):
69
b1 = Branch.initialize(u'.')
70
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
71
tree = b1.revision_tree('revision-1')
72
tree = b1.revision_tree(None)
109
b1 = self.get_branch()
110
wt = WorkingTree.create(b1, '.')
111
wt.commit('lala!', rev_id='revision-1', allow_pointless=True)
112
tree = b1.repository.revision_tree('revision-1')
113
tree = b1.repository.revision_tree(None)
73
114
self.assertEqual(len(tree.list_files()), 0)
74
tree = b1.revision_tree(NULL_REVISION)
115
tree = b1.repository.revision_tree(NULL_REVISION)
75
116
self.assertEqual(len(tree.list_files()), 0)
77
def get_unbalanced_branch_pair(self):
118
def get_unbalanced_tree_pair(self):
78
119
"""Return two branches, a and b, with one file in a."""
80
br_a = Branch.initialize("a")
120
get_transport(self.get_url()).mkdir('a')
121
br_a = self.make_branch('a')
122
tree_a = WorkingTree.create(br_a, 'a')
81
123
file('a/b', 'wb').write('b')
82
br_a.working_tree().add('b')
83
commit(br_a, "silly commit", rev_id='A')
85
br_b = Branch.initialize("b")
125
tree_a.commit("silly commit", rev_id='A')
127
get_transport(self.get_url()).mkdir('b')
128
br_b = self.make_branch('b')
129
tree_b = WorkingTree.create(br_b, 'b')
130
return tree_a, tree_b
88
132
def get_balanced_branch_pair(self):
89
133
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
90
br_a, br_b = self.get_unbalanced_branch_pair()
91
br_a.push_stores(br_b)
134
tree_a, tree_b = self.get_unbalanced_tree_pair()
135
tree_a.branch.push_stores(tree_b.branch)
136
return tree_a, tree_b
94
138
def test_push_stores(self):
95
139
"""Copy the stores from one branch to another"""
96
br_a, br_b = self.get_unbalanced_branch_pair()
140
tree_a, tree_b = self.get_unbalanced_tree_pair()
97
143
# ensure the revision is missing.
98
self.assertRaises(NoSuchRevision, br_b.get_revision,
144
self.assertRaises(NoSuchRevision, br_b.repository.get_revision,
99
145
br_a.revision_history()[0])
100
146
br_a.push_stores(br_b)
101
147
# check that b now has all the data from a's first commit.
102
rev = br_b.get_revision(br_a.revision_history()[0])
103
tree = br_b.revision_tree(br_a.revision_history()[0])
148
rev = br_b.repository.get_revision(br_a.revision_history()[0])
149
tree = br_b.repository.revision_tree(br_a.revision_history()[0])
104
150
for file_id in tree:
105
151
if tree.inventory[file_id].kind == "file":
106
152
tree.get_file(file_id).read()
109
def test_copy_branch(self):
154
def test_clone_branch(self):
110
155
"""Copy the stores from one branch to another"""
111
br_a, br_b = self.get_balanced_branch_pair()
112
commit(br_b, "silly commit")
156
tree_a, tree_b = self.get_balanced_branch_pair()
157
tree_b.commit("silly commit")
114
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
115
self.assertEqual(br_a.revision_history(), br_c.revision_history())
159
br_c = tree_a.branch.clone('c', basis_branch=tree_b.branch)
160
self.assertEqual(tree_a.branch.revision_history(),
161
br_c.revision_history())
117
def test_copy_partial(self):
163
def test_clone_partial(self):
118
164
"""Copy only part of the history of a branch."""
119
self.build_tree(['a/', 'a/one'])
120
br_a = Branch.initialize('a')
121
br_a.working_tree().add(['one'])
122
br_a.working_tree().commit('commit one', rev_id='u@d-1')
165
get_transport(self.get_url()).mkdir('a')
166
br_a = self.make_branch('a')
167
wt = WorkingTree.create(br_a, "a")
168
self.build_tree(['a/one'])
170
wt.commit('commit one', rev_id='u@d-1')
123
171
self.build_tree(['a/two'])
124
br_a.working_tree().add(['two'])
125
br_a.working_tree().commit('commit two', rev_id='u@d-2')
126
br_b = copy_branch(br_a, 'b', revision='u@d-1')
173
wt.commit('commit two', rev_id='u@d-2')
174
br_b = br_a.clone('b', revision='u@d-1')
127
175
self.assertEqual(br_b.last_revision(), 'u@d-1')
128
176
self.assertTrue(os.path.exists('b/one'))
129
177
self.assertFalse(os.path.exists('b/two'))
131
179
def test_record_initial_ghost_merge(self):
132
180
"""A pending merge with no revision present is still a merge."""
133
branch = Branch.initialize(u'.')
134
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
135
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
136
rev = branch.get_revision(branch.last_revision())
181
branch = self.get_branch()
182
wt = WorkingTree.create(branch, ".")
183
wt.add_pending_merge('non:existent@rev--ision--0--2')
184
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
185
rev = branch.repository.get_revision(branch.last_revision())
137
186
self.assertEqual(len(rev.parent_ids), 1)
138
187
# parent_sha1s is not populated now, WTF. rbc 20051003
139
188
self.assertEqual(len(rev.parent_sha1s), 0)
140
189
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
142
191
def test_bad_revision(self):
143
branch = Branch.initialize(u'.')
144
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
192
self.assertRaises(errors.InvalidRevisionId,
193
self.get_branch().repository.get_revision,
146
196
# TODO 20051003 RBC:
147
197
# compare the gpg-to-sign info for a commit with a ghost and
172
222
self.assertEquals(wt.pending_merges(), [])
174
224
def test_sign_existing_revision(self):
175
branch = Branch.initialize(u'.')
176
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
225
branch = self.get_branch()
226
wt = WorkingTree.create(branch, ".")
227
wt.commit("base", allow_pointless=True, rev_id='A')
177
228
from bzrlib.testament import Testament
178
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
179
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
180
branch.revision_store.get('A', 'sig').read())
229
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
230
branch.repository.sign_revision('A', strategy)
231
self.assertEqual(Testament.from_revision(branch.repository,
232
'A').as_short_text(),
233
branch.repository.revision_store.get('A',
182
236
def test_store_signature(self):
183
branch = Branch.initialize(u'.')
184
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
186
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
188
def test__relcontrolfilename(self):
189
branch = Branch.initialize(u'.')
190
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
192
def test__relcontrolfilename_empty(self):
193
branch = Branch.initialize(u'.')
194
self.assertEqual('.bzr', branch._rel_controlfilename(''))
237
branch = self.get_branch()
238
branch.repository.store_revision_signature(
239
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
240
self.assertEqual('FOO',
241
branch.repository.revision_store.get('A',
244
def test_branch_keeps_signatures(self):
245
wt = self.make_branch_and_tree('source')
246
wt.commit('A', allow_pointless=True, rev_id='A')
247
wt.branch.repository.sign_revision('A',
248
bzrlib.gpg.LoopbackGPGStrategy(None))
249
#FIXME: clone should work to urls,
250
# wt.clone should work to disks.
251
self.build_tree(['target/'])
252
b2 = wt.branch.clone('target')
253
self.assertEqual(wt.branch.repository.revision_store.get('A',
255
b2.repository.revision_store.get('A',
258
def test_upgrade_preserves_signatures(self):
259
# this is in the current test format
260
wt = self.make_branch_and_tree('source')
261
wt.commit('A', allow_pointless=True, rev_id='A')
262
wt.branch.repository.sign_revision('A',
263
bzrlib.gpg.LoopbackGPGStrategy(None))
264
old_signature = wt.branch.repository.revision_store.get('A',
267
wt = WorkingTree(wt.basedir)
268
new_signature = wt.branch.repository.revision_store.get('A',
270
self.assertEqual(old_signature, new_signature)
196
272
def test_nicks(self):
197
273
"""Branch nicknames"""
199
branch = Branch.initialize('bzr.dev')
274
t = get_transport(self.get_url())
276
branch = self.make_branch('bzr.dev')
200
277
self.assertEqual(branch.nick, 'bzr.dev')
201
os.rename('bzr.dev', 'bzr.ab')
202
branch = Branch.open('bzr.ab')
278
t.move('bzr.dev', 'bzr.ab')
279
branch = Branch.open(self.get_url('bzr.ab'))
203
280
self.assertEqual(branch.nick, 'bzr.ab')
204
281
branch.nick = "Aaron's branch"
205
282
branch.nick = "Aaron's branch"
206
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
286
branch.control_files.controlfilename("branch.conf")
207
290
self.assertEqual(branch.nick, "Aaron's branch")
208
os.rename('bzr.ab', 'integration')
209
branch = Branch.open('integration')
291
t.move('bzr.ab', 'integration')
292
branch = Branch.open(self.get_url('integration'))
210
293
self.assertEqual(branch.nick, "Aaron's branch")
211
294
branch.nick = u"\u1234"
212
295
self.assertEqual(branch.nick, u"\u1234")
214
297
def test_commit_nicks(self):
215
298
"""Nicknames are committed to the revision"""
217
branch = Branch.initialize('bzr.dev')
299
get_transport(self.get_url()).mkdir('bzr.dev')
300
branch = self.make_branch('bzr.dev')
218
301
branch.nick = "My happy branch"
219
branch.working_tree().commit('My commit respect da nick.')
220
committed = branch.get_revision(branch.last_revision())
302
WorkingTree.create(branch, 'bzr.dev').commit('My commit respect da nick.')
303
committed = branch.repository.get_revision(branch.last_revision())
221
304
self.assertEqual(committed.properties["branch-nick"],
222
305
"My happy branch")
225
class TestRemote(TestCaseWithWebserver):
307
def test_no_ancestry_weave(self):
308
# We no longer need to create the ancestry.weave file
309
# since it is *never* used.
310
branch = Branch.create('.')
311
self.failIfExists('.bzr/ancestry.weave')
314
class ChrootedTests(TestCaseWithBranch):
315
"""A support class that provides readonly urls outside the local namespace.
317
This is done by checking if self.transport_server is a MemoryServer. if it
318
is then we are chrooted already, if it is not then an HttpServer is used
323
super(ChrootedTests, self).setUp()
324
if not self.transport_server == MemoryServer:
325
self.transport_readonly_server = HttpServer
227
327
def test_open_containing(self):
228
328
self.assertRaises(NotBranchError, Branch.open_containing,
229
self.get_remote_url(''))
329
self.get_readonly_url(''))
230
330
self.assertRaises(NotBranchError, Branch.open_containing,
231
self.get_remote_url('g/p/q'))
232
b = Branch.initialize(u'.')
233
branch, relpath = Branch.open_containing(self.get_remote_url(''))
331
self.get_readonly_url('g/p/q'))
333
branch = self.branch_format.initialize(self.get_url())
334
except UninitializableFormat:
335
raise TestSkipped("Format %s is not initializable.")
336
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
234
337
self.assertEqual('', relpath)
235
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
338
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
236
339
self.assertEqual('g/p/q', relpath)
238
341
# TODO: rewrite this as a regular unittest, without relying on the displayed output
317
420
self.assertEqual(['lw', 'ul'], branch._calls)
320
class TestBranchTransaction(TestCaseInTempDir):
423
class TestBranchTransaction(TestCaseWithBranch):
323
426
super(TestBranchTransaction, self).setUp()
324
self.branch = Branch.initialize(u'.')
326
429
def test_default_get_transaction(self):
327
430
"""branch.get_transaction on a new branch should give a PassThrough."""
328
self.failUnless(isinstance(self.branch.get_transaction(),
431
self.failUnless(isinstance(self.get_branch().get_transaction(),
329
432
transactions.PassThroughTransaction))
331
434
def test__set_new_transaction(self):
332
self.branch._set_transaction(transactions.ReadOnlyTransaction())
435
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
334
437
def test__set_over_existing_transaction_raises(self):
335
self.branch._set_transaction(transactions.ReadOnlyTransaction())
438
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
336
439
self.assertRaises(errors.LockError,
337
self.branch._set_transaction,
440
self.get_branch()._set_transaction,
338
441
transactions.ReadOnlyTransaction())
340
443
def test_finish_no_transaction_raises(self):
341
self.assertRaises(errors.LockError, self.branch._finish_transaction)
444
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
343
446
def test_finish_readonly_transaction_works(self):
344
self.branch._set_transaction(transactions.ReadOnlyTransaction())
345
self.branch._finish_transaction()
346
self.assertEqual(None, self.branch._transaction)
447
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
448
self.get_branch()._finish_transaction()
449
self.assertEqual(None, self.get_branch().control_files._transaction)
348
451
def test_unlock_calls_finish(self):
349
self.branch.lock_read()
452
self.get_branch().lock_read()
350
453
transaction = InstrumentedTransaction()
351
self.branch._transaction = transaction
454
self.get_branch().control_files._transaction = transaction
455
self.get_branch().unlock()
353
456
self.assertEqual(['finish'], transaction.calls)
355
458
def test_lock_read_acquires_ro_transaction(self):
356
self.branch.lock_read()
357
self.failUnless(isinstance(self.branch.get_transaction(),
459
self.get_branch().lock_read()
460
self.failUnless(isinstance(self.get_branch().get_transaction(),
358
461
transactions.ReadOnlyTransaction))
462
self.get_branch().unlock()
361
464
def test_lock_write_acquires_passthrough_transaction(self):
362
self.branch.lock_write()
465
self.get_branch().lock_write()
363
466
# cannot use get_transaction as its magic
364
self.failUnless(isinstance(self.branch._transaction,
467
self.failUnless(isinstance(self.get_branch().control_files._transaction,
365
468
transactions.PassThroughTransaction))
369
class TestBranchPushLocations(TestCaseInTempDir):
372
super(TestBranchPushLocations, self).setUp()
373
self.branch = Branch.initialize(u'.')
469
self.get_branch().unlock()
472
class TestBranchPushLocations(TestCaseWithBranch):
375
474
def test_get_push_location_unset(self):
376
self.assertEqual(None, self.branch.get_push_location())
475
self.assertEqual(None, self.get_branch().get_push_location())
378
477
def test_get_push_location_exact(self):
379
478
from bzrlib.config import (branches_config_filename,
382
481
fn = branches_config_filename()
383
482
print >> open(fn, 'wt'), ("[%s]\n"
384
483
"push_location=foo" %
386
self.assertEqual("foo", self.branch.get_push_location())
484
self.get_branch().base[:-1])
485
self.assertEqual("foo", self.get_branch().get_push_location())
388
487
def test_set_push_location(self):
389
488
from bzrlib.config import (branches_config_filename,
390
489
ensure_config_dir_exists)
391
490
ensure_config_dir_exists()
392
491
fn = branches_config_filename()
393
self.branch.set_push_location('foo')
492
self.get_branch().set_push_location('foo')
394
493
self.assertFileEqual("[%s]\n"
395
"push_location = foo" % getcwd(),
494
"push_location = foo" % self.get_branch().base[:-1],
398
497
# TODO RBC 20051029 test getting a push location from a branch in a
399
498
# recursive section - that is, it appends the branch name.
501
class TestFormat(TestCaseWithBranch):
502
"""Tests for the format itself."""
504
def test_format_initialize_find_open(self):
505
# loopback test to check the current format initializes to itself.
506
if not self.branch_format.is_supported():
507
# unsupported formats are not loopback testable
508
# because the default open will not open them and
509
# they may not be initializable.
511
# supported formats must be able to init and open
512
t = get_transport(self.get_url())
513
readonly_t = get_transport(self.get_readonly_url())
514
made_branch = self.branch_format.initialize(t.base)
515
self.failUnless(isinstance(made_branch, branch.Branch))
516
self.assertEqual(self.branch_format,
517
branch.BzrBranchFormat.find_format(readonly_t))
518
direct_opened_branch = self.branch_format.open(readonly_t)
519
opened_branch = branch.Branch.open(t.base)
520
self.assertEqual(made_branch._branch_format,
521
opened_branch._branch_format)
522
self.assertEqual(direct_opened_branch._branch_format,
523
opened_branch._branch_format)
524
self.failUnless(isinstance(opened_branch, branch.Branch))
526
def test_open_not_branch(self):
527
self.assertRaises(NoSuchFile,
528
self.branch_format.open,
529
get_transport(self.get_readonly_url()))