/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

Merge with serialize-transform

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for branch implementations - tests a branch format."""
 
18
 
 
19
import os
 
20
import sys
 
21
 
 
22
from bzrlib import (
 
23
    branch,
 
24
    bzrdir,
 
25
    errors,
 
26
    gpg,
 
27
    urlutils,
 
28
    transactions,
 
29
    remote,
 
30
    repository,
 
31
    tests,
 
32
    )
 
33
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
34
from bzrlib.delta import TreeDelta
 
35
from bzrlib.errors import (FileExists,
 
36
                           NoSuchRevision,
 
37
                           NoSuchFile,
 
38
                           UninitializableFormat,
 
39
                           NotBranchError,
 
40
                           )
 
41
from bzrlib.osutils import getcwd
 
42
import bzrlib.revision
 
43
from bzrlib.symbol_versioning import deprecated_in
 
44
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
 
45
from bzrlib.tests.branch_implementations import TestCaseWithBranch
 
46
from bzrlib.tests.http_server import HttpServer
 
47
from bzrlib.trace import mutter
 
48
from bzrlib.transport import get_transport
 
49
from bzrlib.transport.memory import MemoryServer
 
50
from bzrlib.upgrade import upgrade
 
51
from bzrlib.workingtree import WorkingTree
 
52
 
 
53
 
 
54
class TestBranch(TestCaseWithBranch):
 
55
 
 
56
    def test_create_tree_with_merge(self):
 
57
        tree = self.create_tree_with_merge()
 
58
        tree.lock_read()
 
59
        self.addCleanup(tree.unlock)
 
60
        graph = tree.branch.repository.get_graph()
 
61
        ancestry_graph = graph.get_parent_map(
 
62
            tree.branch.repository.all_revision_ids())
 
63
        self.assertEqual({'rev-1':('null:',),
 
64
                          'rev-2':('rev-1', ),
 
65
                          'rev-1.1.1':('rev-1', ),
 
66
                          'rev-3':('rev-2', 'rev-1.1.1', ),
 
67
                         }, ancestry_graph)
 
68
 
 
69
    def test_revision_ids_are_utf8(self):
 
70
        wt = self.make_branch_and_tree('tree')
 
71
        wt.commit('f', rev_id='rev1')
 
72
        wt.commit('f', rev_id='rev2')
 
73
        wt.commit('f', rev_id='rev3')
 
74
 
 
75
        br = self.get_branch()
 
76
        br.fetch(wt.branch)
 
77
        br.set_revision_history(['rev1', 'rev2', 'rev3'])
 
78
        rh = br.revision_history()
 
79
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
 
80
        for revision_id in rh:
 
81
            self.assertIsInstance(revision_id, str)
 
82
        last = br.last_revision()
 
83
        self.assertEqual('rev3', last)
 
84
        self.assertIsInstance(last, str)
 
85
        revno, last = br.last_revision_info()
 
86
        self.assertEqual(3, revno)
 
87
        self.assertEqual('rev3', last)
 
88
        self.assertIsInstance(last, str)
 
89
 
 
90
    def test_fetch_revisions(self):
 
91
        """Test fetch-revision operation."""
 
92
        wt = self.make_branch_and_tree('b1')
 
93
        b1 = wt.branch
 
94
        self.build_tree_contents([('b1/foo', 'hello')])
 
95
        wt.add(['foo'], ['foo-id'])
 
96
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
97
 
 
98
        b2 = self.make_branch('b2')
 
99
        self.assertEqual((1, []), b2.fetch(b1))
 
100
 
 
101
        rev = b2.repository.get_revision('revision-1')
 
102
        tree = b2.repository.revision_tree('revision-1')
 
103
        tree.lock_read()
 
104
        self.addCleanup(tree.unlock)
 
105
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
 
106
 
 
107
    def test_get_revision_delta(self):
 
108
        tree_a = self.make_branch_and_tree('a')
 
109
        self.build_tree(['a/foo'])
 
110
        tree_a.add('foo', 'file1')
 
111
        tree_a.commit('rev1', rev_id='rev1')
 
112
        self.build_tree(['a/vla'])
 
113
        tree_a.add('vla', 'file2')
 
114
        tree_a.commit('rev2', rev_id='rev2')
 
115
 
 
116
        delta = tree_a.branch.get_revision_delta(1)
 
117
        self.assertIsInstance(delta, TreeDelta)
 
118
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
 
119
        delta = tree_a.branch.get_revision_delta(2)
 
120
        self.assertIsInstance(delta, TreeDelta)
 
121
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
 
122
 
 
123
    def get_unbalanced_tree_pair(self):
 
124
        """Return two branches, a and b, with one file in a."""
 
125
        tree_a = self.make_branch_and_tree('a')
 
126
        self.build_tree_contents([('a/b', 'b')])
 
127
        tree_a.add('b')
 
128
        tree_a.commit("silly commit", rev_id='A')
 
129
 
 
130
        tree_b = self.make_branch_and_tree('b')
 
131
        return tree_a, tree_b
 
132
 
 
133
    def get_balanced_branch_pair(self):
 
134
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
135
        tree_a, tree_b = self.get_unbalanced_tree_pair()
 
136
        tree_b.branch.repository.fetch(tree_a.branch.repository)
 
137
        return tree_a, tree_b
 
138
 
 
139
    def test_clone_partial(self):
 
140
        """Copy only part of the history of a branch."""
 
141
        # TODO: RBC 20060208 test with a revision not on revision-history.
 
142
        #       what should that behaviour be ? Emailed the list.
 
143
        # First, make a branch with two commits.
 
144
        wt_a = self.make_branch_and_tree('a')
 
145
        self.build_tree(['a/one'])
 
146
        wt_a.add(['one'])
 
147
        wt_a.commit('commit one', rev_id='1')
 
148
        self.build_tree(['a/two'])
 
149
        wt_a.add(['two'])
 
150
        wt_a.commit('commit two', rev_id='2')
 
151
        # Now make a copy of the repository.
 
152
        repo_b = self.make_repository('b')
 
153
        wt_a.branch.repository.copy_content_into(repo_b)
 
154
        # wt_a might be a lightweight checkout, so get a hold of the actual
 
155
        # branch (because you can't do a partial clone of a lightweight
 
156
        # checkout).
 
157
        branch = wt_a.branch.bzrdir.open_branch()
 
158
        # Then make a branch where the new repository is, but specify a revision
 
159
        # ID.  The new branch's history will stop at the specified revision.
 
160
        br_b = branch.clone(repo_b.bzrdir, revision_id='1')
 
161
        self.assertEqual('1', br_b.last_revision())
 
162
 
 
163
    def get_parented_branch(self):
 
164
        wt_a = self.make_branch_and_tree('a')
 
165
        self.build_tree(['a/one'])
 
166
        wt_a.add(['one'])
 
167
        wt_a.commit('commit one', rev_id='1')
 
168
 
 
169
        branch_b = wt_a.branch.bzrdir.sprout('b', revision_id='1').open_branch()
 
170
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
 
171
        return branch_b
 
172
 
 
173
    def test_clone_branch_nickname(self):
 
174
        # test the nick name is preserved always
 
175
        raise TestSkipped('XXX branch cloning is not yet tested.')
 
176
 
 
177
    def test_clone_branch_parent(self):
 
178
        # test the parent is preserved always
 
179
        branch_b = self.get_parented_branch()
 
180
        repo_c = self.make_repository('c')
 
181
        branch_b.repository.copy_content_into(repo_c)
 
182
        branch_c = branch_b.clone(repo_c.bzrdir)
 
183
        self.assertNotEqual(None, branch_c.get_parent())
 
184
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
 
185
 
 
186
        # We can also set a specific parent, and it should be honored
 
187
        random_parent = 'http://bazaar-vcs.org/path/to/branch'
 
188
        branch_b.set_parent(random_parent)
 
189
        repo_d = self.make_repository('d')
 
190
        branch_b.repository.copy_content_into(repo_d)
 
191
        branch_d = branch_b.clone(repo_d.bzrdir)
 
192
        self.assertEqual(random_parent, branch_d.get_parent())
 
193
 
 
194
    def test_submit_branch(self):
 
195
        """Submit location can be queried and set"""
 
196
        branch = self.make_branch('branch')
 
197
        self.assertEqual(branch.get_submit_branch(), None)
 
198
        branch.set_submit_branch('sftp://example.com')
 
199
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
 
200
        branch.set_submit_branch('sftp://example.net')
 
201
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
202
        
 
203
    def test_public_branch(self):
 
204
        """public location can be queried and set"""
 
205
        branch = self.make_branch('branch')
 
206
        self.assertEqual(branch.get_public_branch(), None)
 
207
        branch.set_public_branch('sftp://example.com')
 
208
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
 
209
        branch.set_public_branch('sftp://example.net')
 
210
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
 
211
        branch.set_public_branch(None)
 
212
        self.assertEqual(branch.get_public_branch(), None)
 
213
 
 
214
    def test_record_initial_ghost(self):
 
215
        """Branches should support having ghosts."""
 
216
        wt = self.make_branch_and_tree('.')
 
217
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
 
218
            allow_leftmost_as_ghost=True)
 
219
        self.assertEqual(['non:existent@rev--ision--0--2'],
 
220
            wt.get_parent_ids())
 
221
        rev_id = wt.commit('commit against a ghost first parent.')
 
222
        rev = wt.branch.repository.get_revision(rev_id)
 
223
        self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
 
224
        # parent_sha1s is not populated now, WTF. rbc 20051003
 
225
        self.assertEqual(len(rev.parent_sha1s), 0)
 
226
 
 
227
    def test_record_two_ghosts(self):
 
228
        """Recording with all ghosts works."""
 
229
        wt = self.make_branch_and_tree('.')
 
230
        wt.set_parent_ids([
 
231
                'foo@azkhazan-123123-abcabc',
 
232
                'wibble@fofof--20050401--1928390812',
 
233
            ],
 
234
            allow_leftmost_as_ghost=True)
 
235
        rev_id = wt.commit("commit from ghost base with one merge")
 
236
        # the revision should have been committed with two parents
 
237
        rev = wt.branch.repository.get_revision(rev_id)
 
238
        self.assertEqual(['foo@azkhazan-123123-abcabc',
 
239
            'wibble@fofof--20050401--1928390812'],
 
240
            rev.parent_ids)
 
241
 
 
242
    def test_bad_revision(self):
 
243
        self.assertRaises(errors.InvalidRevisionId,
 
244
                          self.get_branch().repository.get_revision,
 
245
                          None)
 
246
 
 
247
# TODO 20051003 RBC:
 
248
# compare the gpg-to-sign info for a commit with a ghost and 
 
249
#     an identical tree without a ghost
 
250
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
251
        
 
252
    def test_sign_existing_revision(self):
 
253
        wt = self.make_branch_and_tree('.')
 
254
        branch = wt.branch
 
255
        wt.commit("base", allow_pointless=True, rev_id='A')
 
256
        from bzrlib.testament import Testament
 
257
        strategy = gpg.LoopbackGPGStrategy(None)
 
258
        branch.repository.lock_write()
 
259
        branch.repository.start_write_group()
 
260
        branch.repository.sign_revision('A', strategy)
 
261
        branch.repository.commit_write_group()
 
262
        branch.repository.unlock()
 
263
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
 
264
                         Testament.from_revision(branch.repository,
 
265
                         'A').as_short_text() +
 
266
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
 
267
                         branch.repository.get_signature_text('A'))
 
268
 
 
269
    def test_store_signature(self):
 
270
        wt = self.make_branch_and_tree('.')
 
271
        branch = wt.branch
 
272
        branch.lock_write()
 
273
        try:
 
274
            branch.repository.start_write_group()
 
275
            try:
 
276
                branch.repository.store_revision_signature(
 
277
                    gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
 
278
            except:
 
279
                branch.repository.abort_write_group()
 
280
                raise
 
281
            else:
 
282
                branch.repository.commit_write_group()
 
283
        finally:
 
284
            branch.unlock()
 
285
        # A signature without a revision should not be accessible.
 
286
        self.assertRaises(errors.NoSuchRevision,
 
287
                          branch.repository.has_signature_for_revision_id,
 
288
                          'A')
 
289
        wt.commit("base", allow_pointless=True, rev_id='A')
 
290
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
 
291
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
 
292
                         branch.repository.get_signature_text('A'))
 
293
 
 
294
    def test_branch_keeps_signatures(self):
 
295
        wt = self.make_branch_and_tree('source')
 
296
        wt.commit('A', allow_pointless=True, rev_id='A')
 
297
        repo = wt.branch.repository
 
298
        repo.lock_write()
 
299
        repo.start_write_group()
 
300
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
 
301
        repo.commit_write_group()
 
302
        repo.unlock()
 
303
        #FIXME: clone should work to urls,
 
304
        # wt.clone should work to disks.
 
305
        self.build_tree(['target/'])
 
306
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
 
307
        self.assertEqual(repo.get_signature_text('A'),
 
308
                         d2.open_repository().get_signature_text('A'))
 
309
 
 
310
    def test_missing_revisions(self):
 
311
        t1 = self.make_branch_and_tree('b1')
 
312
        rev1 = t1.commit('one')
 
313
        t2 = t1.bzrdir.sprout('b2').open_workingtree()
 
314
        rev2 = t1.commit('two')
 
315
        rev3 = t1.commit('three')
 
316
 
 
317
        self.assertEqual([rev2, rev3],
 
318
            self.applyDeprecated(deprecated_in((1, 6, 0)),
 
319
            t2.branch.missing_revisions, t1.branch))
 
320
 
 
321
        self.assertEqual([],
 
322
            self.applyDeprecated(deprecated_in((1, 6, 0)),
 
323
            t2.branch.missing_revisions, t1.branch, stop_revision=1))
 
324
        self.assertEqual([rev2],
 
325
            self.applyDeprecated(deprecated_in((1, 6, 0)),
 
326
            t2.branch.missing_revisions, t1.branch, stop_revision=2))
 
327
        self.assertEqual([rev2, rev3],
 
328
            self.applyDeprecated(deprecated_in((1, 6, 0)),
 
329
            t2.branch.missing_revisions, t1.branch, stop_revision=3))
 
330
 
 
331
        self.assertRaises(errors.NoSuchRevision,
 
332
            self.applyDeprecated, deprecated_in((1, 6, 0)),
 
333
            t2.branch.missing_revisions, t1.branch, stop_revision=4)
 
334
 
 
335
        rev4 = t2.commit('four')
 
336
        self.assertRaises(errors.DivergedBranches,
 
337
            self.applyDeprecated, deprecated_in((1, 6, 0)),
 
338
            t2.branch.missing_revisions, t1.branch)
 
339
 
 
340
    def test_nicks(self):
 
341
        """Test explicit and implicit branch nicknames.
 
342
        
 
343
        Nicknames are implicitly the name of the branch's directory, unless an
 
344
        explicit nickname is set.  That is, an explicit nickname always
 
345
        overrides the implicit one.
 
346
        """
 
347
        t = get_transport(self.get_url())
 
348
        branch = self.make_branch('bzr.dev')
 
349
        # The nick will be 'bzr.dev', because there is no explicit nick set.
 
350
        self.assertEqual(branch.nick, 'bzr.dev')
 
351
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
 
352
        # will report its nick as 'bzr.ab'.
 
353
        t.move('bzr.dev', 'bzr.ab')
 
354
        branch = Branch.open(self.get_url('bzr.ab'))
 
355
        self.assertEqual(branch.nick, 'bzr.ab')
 
356
        # Set the branch nick explicitly.  This will ensure there's a branch
 
357
        # config file in the branch.
 
358
        branch.nick = "Aaron's branch"
 
359
        if not isinstance(branch, remote.RemoteBranch):
 
360
            self.failUnless(branch._transport.has("branch.conf"))
 
361
        # Because the nick has been set explicitly, the nick is now always
 
362
        # "Aaron's branch", regardless of directory name.
 
363
        self.assertEqual(branch.nick, "Aaron's branch")
 
364
        t.move('bzr.ab', 'integration')
 
365
        branch = Branch.open(self.get_url('integration'))
 
366
        self.assertEqual(branch.nick, "Aaron's branch")
 
367
        branch.nick = u"\u1234"
 
368
        self.assertEqual(branch.nick, u"\u1234")
 
369
 
 
370
    def test_commit_nicks(self):
 
371
        """Nicknames are committed to the revision"""
 
372
        wt = self.make_branch_and_tree('bzr.dev')
 
373
        branch = wt.branch
 
374
        branch.nick = "My happy branch"
 
375
        wt.commit('My commit respect da nick.')
 
376
        committed = branch.repository.get_revision(branch.last_revision())
 
377
        self.assertEqual(committed.properties["branch-nick"],
 
378
                         "My happy branch")
 
379
 
 
380
    def test_create_open_branch_uses_repository(self):
 
381
        try:
 
382
            repo = self.make_repository('.', shared=True)
 
383
        except errors.IncompatibleFormat:
 
384
            return
 
385
        child_transport = repo.bzrdir.root_transport.clone('child')
 
386
        child_transport.mkdir('.')
 
387
        child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
 
388
        try:
 
389
            child_branch = self.branch_format.initialize(child_dir)
 
390
        except errors.UninitializableFormat:
 
391
            # branch references are not default init'able.
 
392
            return
 
393
        self.assertEqual(repo.bzrdir.root_transport.base,
 
394
                         child_branch.repository.bzrdir.root_transport.base)
 
395
        child_branch = branch.Branch.open(self.get_url('child'))
 
396
        self.assertEqual(repo.bzrdir.root_transport.base,
 
397
                         child_branch.repository.bzrdir.root_transport.base)
 
398
 
 
399
    def test_format_description(self):
 
400
        tree = self.make_branch_and_tree('tree')
 
401
        text = tree.branch._format.get_format_description()
 
402
        self.failUnless(len(text))
 
403
 
 
404
    def test_get_commit_builder(self):
 
405
        branch = self.make_branch(".")
 
406
        branch.lock_write()
 
407
        builder = branch.get_commit_builder([])
 
408
        self.assertIsInstance(builder, repository.CommitBuilder)
 
409
        branch.repository.commit_write_group()
 
410
        branch.unlock()
 
411
 
 
412
    def test_generate_revision_history(self):
 
413
        """Create a fake revision history easily."""
 
414
        tree = self.make_branch_and_tree('.')
 
415
        rev1 = tree.commit('foo')
 
416
        orig_history = tree.branch.revision_history()
 
417
        rev2 = tree.commit('bar', allow_pointless=True)
 
418
        tree.branch.generate_revision_history(rev1)
 
419
        self.assertEqual(orig_history, tree.branch.revision_history())
 
420
 
 
421
    def test_generate_revision_history_NULL_REVISION(self):
 
422
        tree = self.make_branch_and_tree('.')
 
423
        rev1 = tree.commit('foo')
 
424
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
 
425
        self.assertEqual([], tree.branch.revision_history())
 
426
 
 
427
    def test_create_checkout(self):
 
428
        tree_a = self.make_branch_and_tree('a')
 
429
        branch_a = tree_a.branch
 
430
        checkout_b = branch_a.create_checkout('b')
 
431
        self.assertEqual('null:', checkout_b.last_revision())
 
432
        checkout_b.commit('rev1', rev_id='rev1')
 
433
        self.assertEqual('rev1', branch_a.last_revision())
 
434
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
 
435
 
 
436
        checkout_c = branch_a.create_checkout('c', lightweight=True)
 
437
        self.assertEqual('rev1', checkout_c.last_revision())
 
438
        checkout_c.commit('rev2', rev_id='rev2')
 
439
        self.assertEqual('rev2', branch_a.last_revision())
 
440
        self.assertEqual(checkout_c.branch.base, branch_a.base)
 
441
 
 
442
        os.mkdir('d')
 
443
        checkout_d = branch_a.create_checkout('d', lightweight=True)
 
444
        self.assertEqual('rev2', checkout_d.last_revision())
 
445
        os.mkdir('e')
 
446
        checkout_e = branch_a.create_checkout('e')
 
447
        self.assertEqual('rev2', checkout_e.last_revision())
 
448
 
 
449
    def test_create_anonymous_lightweight_checkout(self):
 
450
        """A lightweight checkout from a readonly branch should succeed."""
 
451
        tree_a = self.make_branch_and_tree('a')
 
452
        rev_id = tree_a.commit('put some content in the branch')
 
453
        # open the branch via a readonly transport
 
454
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
 
455
        # sanity check that the test will be valid
 
456
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
 
457
            source_branch.lock_write)
 
458
        checkout = source_branch.create_checkout('c', lightweight=True)
 
459
        self.assertEqual(rev_id, checkout.last_revision())
 
460
 
 
461
    def test_create_anonymous_heavyweight_checkout(self):
 
462
        """A regular checkout from a readonly branch should succeed."""
 
463
        tree_a = self.make_branch_and_tree('a')
 
464
        rev_id = tree_a.commit('put some content in the branch')
 
465
        # open the branch via a readonly transport
 
466
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
 
467
        # sanity check that the test will be valid
 
468
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
 
469
            source_branch.lock_write)
 
470
        checkout = source_branch.create_checkout('c')
 
471
        self.assertEqual(rev_id, checkout.last_revision())
 
472
 
 
473
    def test_set_revision_history(self):
 
474
        tree = self.make_branch_and_tree('a')
 
475
        tree.commit('a commit', rev_id='rev1')
 
476
        br = tree.branch
 
477
        br.set_revision_history(["rev1"])
 
478
        self.assertEquals(br.revision_history(), ["rev1"])
 
479
        br.set_revision_history([])
 
480
        self.assertEquals(br.revision_history(), [])
 
481
 
 
482
 
 
483
class ChrootedTests(TestCaseWithBranch):
 
484
    """A support class that provides readonly urls outside the local namespace.
 
485
 
 
486
    This is done by checking if self.transport_server is a MemoryServer. if it
 
487
    is then we are chrooted already, if it is not then an HttpServer is used
 
488
    for readonly urls.
 
489
    """
 
490
 
 
491
    def setUp(self):
 
492
        super(ChrootedTests, self).setUp()
 
493
        if not self.vfs_transport_factory == MemoryServer:
 
494
            self.transport_readonly_server = HttpServer
 
495
 
 
496
    def test_open_containing(self):
 
497
        self.assertRaises(NotBranchError, Branch.open_containing,
 
498
                          self.get_readonly_url(''))
 
499
        self.assertRaises(NotBranchError, Branch.open_containing,
 
500
                          self.get_readonly_url('g/p/q'))
 
501
        branch = self.make_branch('.')
 
502
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
503
        self.assertEqual('', relpath)
 
504
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
505
        self.assertEqual('g/p/q', relpath)
 
506
        
 
507
 
 
508
class InstrumentedTransaction(object):
 
509
 
 
510
    def finish(self):
 
511
        self.calls.append('finish')
 
512
 
 
513
    def __init__(self):
 
514
        self.calls = []
 
515
 
 
516
 
 
517
class TestDecorator(object):
 
518
 
 
519
    def __init__(self):
 
520
        self._calls = []
 
521
 
 
522
    def lock_read(self):
 
523
        self._calls.append('lr')
 
524
 
 
525
    def lock_write(self):
 
526
        self._calls.append('lw')
 
527
 
 
528
    def unlock(self):
 
529
        self._calls.append('ul')
 
530
 
 
531
    @needs_read_lock
 
532
    def do_with_read(self):
 
533
        return 1
 
534
 
 
535
    @needs_read_lock
 
536
    def except_with_read(self):
 
537
        raise RuntimeError
 
538
 
 
539
    @needs_write_lock
 
540
    def do_with_write(self):
 
541
        return 2
 
542
 
 
543
    @needs_write_lock
 
544
    def except_with_write(self):
 
545
        raise RuntimeError
 
546
 
 
547
 
 
548
class TestDecorators(TestCase):
 
549
 
 
550
    def test_needs_read_lock(self):
 
551
        branch = TestDecorator()
 
552
        self.assertEqual(1, branch.do_with_read())
 
553
        self.assertEqual(['lr', 'ul'], branch._calls)
 
554
 
 
555
    def test_excepts_in_read_lock(self):
 
556
        branch = TestDecorator()
 
557
        self.assertRaises(RuntimeError, branch.except_with_read)
 
558
        self.assertEqual(['lr', 'ul'], branch._calls)
 
559
 
 
560
    def test_needs_write_lock(self):
 
561
        branch = TestDecorator()
 
562
        self.assertEqual(2, branch.do_with_write())
 
563
        self.assertEqual(['lw', 'ul'], branch._calls)
 
564
 
 
565
    def test_excepts_in_write_lock(self):
 
566
        branch = TestDecorator()
 
567
        self.assertRaises(RuntimeError, branch.except_with_write)
 
568
        self.assertEqual(['lw', 'ul'], branch._calls)
 
569
 
 
570
 
 
571
class TestBranchPushLocations(TestCaseWithBranch):
 
572
 
 
573
    def test_get_push_location_unset(self):
 
574
        self.assertEqual(None, self.get_branch().get_push_location())
 
575
 
 
576
    def test_get_push_location_exact(self):
 
577
        from bzrlib.config import (locations_config_filename,
 
578
                                   ensure_config_dir_exists)
 
579
        ensure_config_dir_exists()
 
580
        fn = locations_config_filename()
 
581
        open(fn, 'wt').write(("[%s]\n"
 
582
                                  "push_location=foo\n" %
 
583
                                  self.get_branch().base[:-1]))
 
584
        self.assertEqual("foo", self.get_branch().get_push_location())
 
585
 
 
586
    def test_set_push_location(self):
 
587
        branch = self.get_branch()
 
588
        branch.set_push_location('foo')
 
589
        self.assertEqual('foo', branch.get_push_location())
 
590
 
 
591
 
 
592
class TestFormat(TestCaseWithBranch):
 
593
    """Tests for the format itself."""
 
594
 
 
595
    def test_get_reference(self):
 
596
        """get_reference on all regular branches should return None."""
 
597
        if not self.branch_format.is_supported():
 
598
            # unsupported formats are not loopback testable
 
599
            # because the default open will not open them and
 
600
            # they may not be initializable.
 
601
            return
 
602
        made_branch = self.make_branch('.')
 
603
        self.assertEqual(None,
 
604
            made_branch._format.get_reference(made_branch.bzrdir))
 
605
 
 
606
    def test_set_reference(self):
 
607
        """set_reference on all regular branches should be callable."""
 
608
        if not self.branch_format.is_supported():
 
609
            # unsupported formats are not loopback testable
 
610
            # because the default open will not open them and
 
611
            # they may not be initializable.
 
612
            return
 
613
        this_branch = self.make_branch('this')
 
614
        other_branch = self.make_branch('other')
 
615
        try:
 
616
            this_branch._format.set_reference(this_branch.bzrdir, other_branch)
 
617
        except NotImplementedError:
 
618
            # that's ok
 
619
            pass
 
620
        else:
 
621
            ref = this_branch._format.get_reference(this_branch.bzrdir)
 
622
            self.assertEqual(ref, other_branch.base)
 
623
 
 
624
    def test_format_initialize_find_open(self):
 
625
        # loopback test to check the current format initializes to itself.
 
626
        if not self.branch_format.is_supported():
 
627
            # unsupported formats are not loopback testable
 
628
            # because the default open will not open them and
 
629
            # they may not be initializable.
 
630
            return
 
631
        # supported formats must be able to init and open
 
632
        t = get_transport(self.get_url())
 
633
        readonly_t = get_transport(self.get_readonly_url())
 
634
        made_branch = self.make_branch('.')
 
635
        self.failUnless(isinstance(made_branch, branch.Branch))
 
636
 
 
637
        # find it via bzrdir opening:
 
638
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
 
639
        direct_opened_branch = opened_control.open_branch()
 
640
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
 
641
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
 
642
        self.failUnless(isinstance(direct_opened_branch._format,
 
643
                        self.branch_format.__class__))
 
644
 
 
645
        # find it via Branch.open
 
646
        opened_branch = branch.Branch.open(readonly_t.base)
 
647
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
 
648
        self.assertEqual(made_branch._format.__class__,
 
649
                         opened_branch._format.__class__)
 
650
        # if it has a unique id string, can we probe for it ?
 
651
        try:
 
652
            self.branch_format.get_format_string()
 
653
        except NotImplementedError:
 
654
            return
 
655
        self.assertEqual(self.branch_format,
 
656
                         opened_control.find_branch_format())
 
657
 
 
658
 
 
659
class TestBound(TestCaseWithBranch):
 
660
 
 
661
    def test_bind_unbind(self):
 
662
        branch = self.make_branch('1')
 
663
        branch2 = self.make_branch('2')
 
664
        try:
 
665
            branch.bind(branch2)
 
666
        except errors.UpgradeRequired:
 
667
            raise tests.TestNotApplicable('Format does not support binding')
 
668
        self.assertTrue(branch.unbind())
 
669
        self.assertFalse(branch.unbind())
 
670
        self.assertIs(None, branch.get_bound_location())
 
671
 
 
672
    def test_old_bound_location(self):
 
673
        branch = self.make_branch('branch1')
 
674
        try:
 
675
            self.assertIs(None, branch.get_old_bound_location())
 
676
        except errors.UpgradeRequired:
 
677
            raise tests.TestNotApplicable(
 
678
                    'Format does not store old bound locations')
 
679
        branch2 = self.make_branch('branch2')
 
680
        branch.bind(branch2)
 
681
        self.assertIs(None, branch.get_old_bound_location())
 
682
        branch.unbind()
 
683
        self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
 
684
 
 
685
    def test_bind_diverged(self):
 
686
        tree_a = self.make_branch_and_tree('tree_a')
 
687
        tree_a.commit('rev1a')
 
688
        tree_b = tree_a.bzrdir.sprout('tree_b').open_workingtree()
 
689
        tree_a.commit('rev2a')
 
690
        tree_b.commit('rev2b')
 
691
        try:
 
692
            tree_b.branch.bind(tree_a.branch)
 
693
        except errors.UpgradeRequired:
 
694
            raise tests.TestNotApplicable('Format does not support binding')
 
695
 
 
696
 
 
697
class TestStrict(TestCaseWithBranch):
 
698
 
 
699
    def test_strict_history(self):
 
700
        tree1 = self.make_branch_and_tree('tree1')
 
701
        try:
 
702
            tree1.branch.set_append_revisions_only(True)
 
703
        except errors.UpgradeRequired:
 
704
            raise TestSkipped('Format does not support strict history')
 
705
        tree1.commit('empty commit')
 
706
        tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
 
707
        tree2.commit('empty commit 2')
 
708
        tree1.pull(tree2.branch)
 
709
        tree1.commit('empty commit 3')
 
710
        tree2.commit('empty commit 4')
 
711
        self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
 
712
        tree2.merge_from_branch(tree1.branch)
 
713
        tree2.commit('empty commit 5')
 
714
        self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
 
715
                          tree2.branch)
 
716
        tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
 
717
        tree3.merge_from_branch(tree2.branch)
 
718
        tree3.commit('empty commit 6')
 
719
        tree2.pull(tree3.branch)