/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch_implementations.py

Add a readonly decorator for transports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# (C) 2005, 2006 Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for branch implementations - tests a branch format."""
 
18
 
 
19
import os
 
20
import sys
 
21
 
 
22
import bzrlib.branch as branch
 
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
24
from bzrlib.clone import copy_branch
 
25
from bzrlib.commit import commit
 
26
import bzrlib.errors as errors
 
27
from bzrlib.errors import (NoSuchRevision,
 
28
                           NoSuchFile,
 
29
                           UninitializableFormat,
 
30
                           NotBranchError,
 
31
                           )
 
32
import bzrlib.gpg
 
33
from bzrlib.osutils import getcwd
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir, TestSkipped
 
35
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
36
from bzrlib.trace import mutter
 
37
import bzrlib.transactions as transactions
 
38
from bzrlib.transport import get_transport
 
39
from bzrlib.revision import NULL_REVISION
 
40
 
 
41
# TODO: Make a branch using basis branch, and check that it 
 
42
# doesn't request any files that could have been avoided, by 
 
43
# hooking into the Transport.
 
44
 
 
45
 
 
46
class TestCaseWithBranch(TestCaseInTempDir):
 
47
 
 
48
    def setUp(self):
 
49
        super(TestCaseWithBranch, self).setUp()
 
50
        self.branch = None
 
51
 
 
52
    def get_branch(self):
 
53
        if self.branch is None:
 
54
            self.branch = self.make_branch('.')
 
55
        return self.branch
 
56
 
 
57
    def make_branch(self, relpath):
 
58
        try:
 
59
            return self.branch_format.initialize(relpath)
 
60
        except UninitializableFormat:
 
61
            raise TestSkipped("Format %s is not initializable.")
 
62
 
 
63
 
 
64
class TestBranch(TestCaseWithBranch):
 
65
 
 
66
    def test_append_revisions(self):
 
67
        """Test appending more than one revision"""
 
68
        br = self.get_branch()
 
69
        br.append_revision("rev1")
 
70
        self.assertEquals(br.revision_history(), ["rev1",])
 
71
        br.append_revision("rev2", "rev3")
 
72
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
 
73
 
 
74
    def test_fetch_revisions(self):
 
75
        """Test fetch-revision operation."""
 
76
        from bzrlib.fetch import Fetcher
 
77
        os.mkdir('b1')
 
78
        os.mkdir('b2')
 
79
        b1 = self.make_branch('b1')
 
80
        b2 = self.make_branch('b2')
 
81
        file('b1/foo', 'w').write('hello')
 
82
        b1.working_tree().add(['foo'], ['foo-id'])
 
83
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
 
84
 
 
85
        mutter('start fetch')
 
86
        f = Fetcher(from_branch=b1, to_branch=b2)
 
87
        eq = self.assertEquals
 
88
        eq(f.count_copied, 1)
 
89
        eq(f.last_revision, 'revision-1')
 
90
 
 
91
        rev = b2.get_revision('revision-1')
 
92
        tree = b2.revision_tree('revision-1')
 
93
        eq(tree.get_file_text('foo-id'), 'hello')
 
94
 
 
95
    def test_revision_tree(self):
 
96
        b1 = self.get_branch()
 
97
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
98
        tree = b1.revision_tree('revision-1')
 
99
        tree = b1.revision_tree(None)
 
100
        self.assertEqual(len(tree.list_files()), 0)
 
101
        tree = b1.revision_tree(NULL_REVISION)
 
102
        self.assertEqual(len(tree.list_files()), 0)
 
103
 
 
104
    def get_unbalanced_branch_pair(self):
 
105
        """Return two branches, a and b, with one file in a."""
 
106
        os.mkdir('a')
 
107
        br_a = self.make_branch('a')
 
108
        file('a/b', 'wb').write('b')
 
109
        br_a.working_tree().add('b')
 
110
        commit(br_a, "silly commit", rev_id='A')
 
111
        os.mkdir('b')
 
112
        br_b = self.make_branch('b')
 
113
        return br_a, br_b
 
114
 
 
115
    def get_balanced_branch_pair(self):
 
116
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
117
        br_a, br_b = self.get_unbalanced_branch_pair()
 
118
        br_a.push_stores(br_b)
 
119
        return br_a, br_b
 
120
 
 
121
    def test_push_stores(self):
 
122
        """Copy the stores from one branch to another"""
 
123
        br_a, br_b = self.get_unbalanced_branch_pair()
 
124
        # ensure the revision is missing.
 
125
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
126
                          br_a.revision_history()[0])
 
127
        br_a.push_stores(br_b)
 
128
        # check that b now has all the data from a's first commit.
 
129
        rev = br_b.get_revision(br_a.revision_history()[0])
 
130
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
131
        for file_id in tree:
 
132
            if tree.inventory[file_id].kind == "file":
 
133
                tree.get_file(file_id).read()
 
134
        return br_a, br_b
 
135
 
 
136
    def test_copy_branch(self):
 
137
        """Copy the stores from one branch to another"""
 
138
        br_a, br_b = self.get_balanced_branch_pair()
 
139
        commit(br_b, "silly commit")
 
140
        os.mkdir('c')
 
141
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
142
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
143
 
 
144
    def test_copy_partial(self):
 
145
        """Copy only part of the history of a branch."""
 
146
        self.build_tree(['a/', 'a/one'])
 
147
        br_a = self.make_branch('a')
 
148
        br_a.working_tree().add(['one'])
 
149
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
 
150
        self.build_tree(['a/two'])
 
151
        br_a.working_tree().add(['two'])
 
152
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
 
153
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
154
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
155
        self.assertTrue(os.path.exists('b/one'))
 
156
        self.assertFalse(os.path.exists('b/two'))
 
157
        
 
158
    def test_record_initial_ghost_merge(self):
 
159
        """A pending merge with no revision present is still a merge."""
 
160
        branch = self.get_branch()
 
161
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
162
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
 
163
        rev = branch.get_revision(branch.last_revision())
 
164
        self.assertEqual(len(rev.parent_ids), 1)
 
165
        # parent_sha1s is not populated now, WTF. rbc 20051003
 
166
        self.assertEqual(len(rev.parent_sha1s), 0)
 
167
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
 
168
 
 
169
    def test_bad_revision(self):
 
170
        self.assertRaises(errors.InvalidRevisionId, self.get_branch().get_revision, None)
 
171
 
 
172
# TODO 20051003 RBC:
 
173
# compare the gpg-to-sign info for a commit with a ghost and 
 
174
#     an identical tree without a ghost
 
175
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
176
        
 
177
    def test_pending_merges(self):
 
178
        """Tracking pending-merged revisions."""
 
179
        b = self.get_branch()
 
180
        wt = b.working_tree()
 
181
        self.assertEquals(wt.pending_merges(), [])
 
182
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
183
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
184
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
185
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
186
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
187
        self.assertEquals(wt.pending_merges(),
 
188
                          ['foo@azkhazan-123123-abcabc',
 
189
                           'wibble@fofof--20050401--1928390812'])
 
190
        b.working_tree().commit("commit from base with two merges")
 
191
        rev = b.get_revision(b.revision_history()[0])
 
192
        self.assertEquals(len(rev.parent_ids), 2)
 
193
        self.assertEquals(rev.parent_ids[0],
 
194
                          'foo@azkhazan-123123-abcabc')
 
195
        self.assertEquals(rev.parent_ids[1],
 
196
                           'wibble@fofof--20050401--1928390812')
 
197
        # list should be cleared when we do a commit
 
198
        self.assertEquals(wt.pending_merges(), [])
 
199
 
 
200
    def test_sign_existing_revision(self):
 
201
        branch = self.get_branch()
 
202
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
 
203
        from bzrlib.testament import Testament
 
204
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
205
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
206
                         branch.revision_store.get('A', 'sig').read())
 
207
 
 
208
    def test_store_signature(self):
 
209
        branch = self.get_branch()
 
210
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
211
                                        'FOO', 'A')
 
212
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
213
 
 
214
    def test__relcontrolfilename(self):
 
215
        self.assertEqual('.bzr/%25', self.get_branch()._rel_controlfilename('%'))
 
216
        
 
217
    def test__relcontrolfilename_empty(self):
 
218
        self.assertEqual('.bzr', self.get_branch()._rel_controlfilename(''))
 
219
 
 
220
    def test_nicks(self):
 
221
        """Branch nicknames"""
 
222
        os.mkdir('bzr.dev')
 
223
        branch = self.make_branch('bzr.dev')
 
224
        self.assertEqual(branch.nick, 'bzr.dev')
 
225
        os.rename('bzr.dev', 'bzr.ab')
 
226
        branch = Branch.open('bzr.ab')
 
227
        self.assertEqual(branch.nick, 'bzr.ab')
 
228
        branch.nick = "Aaron's branch"
 
229
        branch.nick = "Aaron's branch"
 
230
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
 
231
        self.assertEqual(branch.nick, "Aaron's branch")
 
232
        os.rename('bzr.ab', 'integration')
 
233
        branch = Branch.open('integration')
 
234
        self.assertEqual(branch.nick, "Aaron's branch")
 
235
        branch.nick = u"\u1234"
 
236
        self.assertEqual(branch.nick, u"\u1234")
 
237
 
 
238
    def test_commit_nicks(self):
 
239
        """Nicknames are committed to the revision"""
 
240
        os.mkdir('bzr.dev')
 
241
        branch = self.get_branch()
 
242
        branch.nick = "My happy branch"
 
243
        branch.working_tree().commit('My commit respect da nick.')
 
244
        committed = branch.get_revision(branch.last_revision())
 
245
        self.assertEqual(committed.properties["branch-nick"], 
 
246
                         "My happy branch")
 
247
 
 
248
 
 
249
class TestRemote(TestCaseWithWebserver):
 
250
 
 
251
    def test_open_containing(self):
 
252
        self.assertRaises(NotBranchError, Branch.open_containing,
 
253
                          self.get_remote_url(''))
 
254
        self.assertRaises(NotBranchError, Branch.open_containing,
 
255
                          self.get_remote_url('g/p/q'))
 
256
        try:
 
257
            branch = self.branch_format.initialize('.')
 
258
        except UninitializableFormat:
 
259
            raise TestSkipped("Format %s is not initializable.")
 
260
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
261
        self.assertEqual('', relpath)
 
262
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
263
        self.assertEqual('g/p/q', relpath)
 
264
        
 
265
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
266
#         >>> from bzrlib.commit import commit
 
267
#         >>> bzrlib.trace.silent = True
 
268
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
269
#         >>> br1.working_tree().add('foo')
 
270
#         >>> br1.working_tree().add('bar')
 
271
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
272
#         >>> br2 = ScratchBranch()
 
273
#         >>> br2.update_revisions(br1)
 
274
#         Added 2 texts.
 
275
#         Added 1 inventories.
 
276
#         Added 1 revisions.
 
277
#         >>> br2.revision_history()
 
278
#         [u'REVISION-ID-1']
 
279
#         >>> br2.update_revisions(br1)
 
280
#         Added 0 revisions.
 
281
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
282
#         True
 
283
 
 
284
class InstrumentedTransaction(object):
 
285
 
 
286
    def finish(self):
 
287
        self.calls.append('finish')
 
288
 
 
289
    def __init__(self):
 
290
        self.calls = []
 
291
 
 
292
 
 
293
class TestDecorator(object):
 
294
 
 
295
    def __init__(self):
 
296
        self._calls = []
 
297
 
 
298
    def lock_read(self):
 
299
        self._calls.append('lr')
 
300
 
 
301
    def lock_write(self):
 
302
        self._calls.append('lw')
 
303
 
 
304
    def unlock(self):
 
305
        self._calls.append('ul')
 
306
 
 
307
    @needs_read_lock
 
308
    def do_with_read(self):
 
309
        return 1
 
310
 
 
311
    @needs_read_lock
 
312
    def except_with_read(self):
 
313
        raise RuntimeError
 
314
 
 
315
    @needs_write_lock
 
316
    def do_with_write(self):
 
317
        return 2
 
318
 
 
319
    @needs_write_lock
 
320
    def except_with_write(self):
 
321
        raise RuntimeError
 
322
 
 
323
 
 
324
class TestDecorators(TestCase):
 
325
 
 
326
    def test_needs_read_lock(self):
 
327
        branch = TestDecorator()
 
328
        self.assertEqual(1, branch.do_with_read())
 
329
        self.assertEqual(['lr', 'ul'], branch._calls)
 
330
 
 
331
    def test_excepts_in_read_lock(self):
 
332
        branch = TestDecorator()
 
333
        self.assertRaises(RuntimeError, branch.except_with_read)
 
334
        self.assertEqual(['lr', 'ul'], branch._calls)
 
335
 
 
336
    def test_needs_write_lock(self):
 
337
        branch = TestDecorator()
 
338
        self.assertEqual(2, branch.do_with_write())
 
339
        self.assertEqual(['lw', 'ul'], branch._calls)
 
340
 
 
341
    def test_excepts_in_write_lock(self):
 
342
        branch = TestDecorator()
 
343
        self.assertRaises(RuntimeError, branch.except_with_write)
 
344
        self.assertEqual(['lw', 'ul'], branch._calls)
 
345
 
 
346
 
 
347
class TestBranchTransaction(TestCaseWithBranch):
 
348
 
 
349
    def setUp(self):
 
350
        super(TestBranchTransaction, self).setUp()
 
351
        self.branch = None
 
352
 
 
353
    def test_default_get_transaction(self):
 
354
        """branch.get_transaction on a new branch should give a PassThrough."""
 
355
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
356
                                   transactions.PassThroughTransaction))
 
357
 
 
358
    def test__set_new_transaction(self):
 
359
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
360
 
 
361
    def test__set_over_existing_transaction_raises(self):
 
362
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
363
        self.assertRaises(errors.LockError,
 
364
                          self.get_branch()._set_transaction,
 
365
                          transactions.ReadOnlyTransaction())
 
366
 
 
367
    def test_finish_no_transaction_raises(self):
 
368
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
 
369
 
 
370
    def test_finish_readonly_transaction_works(self):
 
371
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
372
        self.get_branch()._finish_transaction()
 
373
        self.assertEqual(None, self.get_branch()._transaction)
 
374
 
 
375
    def test_unlock_calls_finish(self):
 
376
        self.get_branch().lock_read()
 
377
        transaction = InstrumentedTransaction()
 
378
        self.get_branch()._transaction = transaction
 
379
        self.get_branch().unlock()
 
380
        self.assertEqual(['finish'], transaction.calls)
 
381
 
 
382
    def test_lock_read_acquires_ro_transaction(self):
 
383
        self.get_branch().lock_read()
 
384
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
385
                                   transactions.ReadOnlyTransaction))
 
386
        self.get_branch().unlock()
 
387
        
 
388
    def test_lock_write_acquires_passthrough_transaction(self):
 
389
        self.get_branch().lock_write()
 
390
        # cannot use get_transaction as its magic
 
391
        self.failUnless(isinstance(self.get_branch()._transaction,
 
392
                                   transactions.PassThroughTransaction))
 
393
        self.get_branch().unlock()
 
394
 
 
395
 
 
396
class TestBranchPushLocations(TestCaseWithBranch):
 
397
 
 
398
    def test_get_push_location_unset(self):
 
399
        self.assertEqual(None, self.get_branch().get_push_location())
 
400
 
 
401
    def test_get_push_location_exact(self):
 
402
        from bzrlib.config import (branches_config_filename,
 
403
                                   ensure_config_dir_exists)
 
404
        ensure_config_dir_exists()
 
405
        fn = branches_config_filename()
 
406
        print >> open(fn, 'wt'), ("[%s]\n"
 
407
                                  "push_location=foo" %
 
408
                                  getcwd())
 
409
        self.assertEqual("foo", self.get_branch().get_push_location())
 
410
 
 
411
    def test_set_push_location(self):
 
412
        from bzrlib.config import (branches_config_filename,
 
413
                                   ensure_config_dir_exists)
 
414
        ensure_config_dir_exists()
 
415
        fn = branches_config_filename()
 
416
        self.get_branch().set_push_location('foo')
 
417
        self.assertFileEqual("[%s]\n"
 
418
                             "push_location = foo" % getcwd(),
 
419
                             fn)
 
420
 
 
421
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
422
    # recursive section - that is, it appends the branch name.
 
423
 
 
424
 
 
425
class TestFormat(TestCaseWithBranch):
 
426
    """Tests for the format itself."""
 
427
 
 
428
    def test_format_initialize_find_open(self):
 
429
        # loopback test to check the current format initializes to itself.
 
430
        if not self.branch_format.is_supported():
 
431
            # unsupported formats are not loopback testable
 
432
            # because the default open will not open them and
 
433
            # they may not be initializable.
 
434
            return
 
435
        # supported formats must be able to init and open
 
436
        t = get_transport('.')
 
437
        made_branch = self.branch_format.initialize(t.base)
 
438
        self.failUnless(isinstance(made_branch, branch.Branch))
 
439
        self.assertEqual(self.branch_format,
 
440
                         branch.BzrBranchFormat.find_format(t))
 
441
        direct_opened_branch = self.branch_format.open(t)
 
442
        opened_branch = branch.Branch.open(t.base)
 
443
        self.assertEqual(made_branch._branch_format,
 
444
                         opened_branch._branch_format)
 
445
        self.assertEqual(direct_opened_branch._branch_format,
 
446
                         opened_branch._branch_format)
 
447
        self.failUnless(isinstance(opened_branch, branch.Branch))
 
448
 
 
449
    def test_open_not_branch(self):
 
450
        self.assertRaises(NoSuchFile,
 
451
                          self.branch_format.open,
 
452
                          get_transport('.'))