/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

Merged Martin

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# (C) 2005 Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import os
 
18
 
 
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
27
from bzrlib.trace import mutter
 
28
import bzrlib.transactions as transactions
 
29
from bzrlib.revision import NULL_REVISION
 
30
 
 
31
# TODO: Make a branch using basis branch, and check that it 
 
32
# doesn't request any files that could have been avoided, by 
 
33
# hooking into the Transport.
 
34
 
 
35
class TestBranch(TestCaseInTempDir):
 
36
 
 
37
    def test_append_revisions(self):
 
38
        """Test appending more than one revision"""
 
39
        br = Branch.initialize(".")
 
40
        br.append_revision("rev1")
 
41
        self.assertEquals(br.revision_history(), ["rev1",])
 
42
        br.append_revision("rev2", "rev3")
 
43
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
 
44
 
 
45
    def test_fetch_revisions(self):
 
46
        """Test fetch-revision operation."""
 
47
        from bzrlib.fetch import Fetcher
 
48
        os.mkdir('b1')
 
49
        os.mkdir('b2')
 
50
        b1 = Branch.initialize('b1')
 
51
        b2 = Branch.initialize('b2')
 
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
53
        b1.add(['foo'], ['foo-id'])
 
54
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
55
 
 
56
        mutter('start fetch')
 
57
        f = Fetcher(from_branch=b1, to_branch=b2)
 
58
        eq = self.assertEquals
 
59
        eq(f.count_copied, 1)
 
60
        eq(f.last_revision, 'revision-1')
 
61
 
 
62
        rev = b2.get_revision('revision-1')
 
63
        tree = b2.revision_tree('revision-1')
 
64
        eq(tree.get_file_text('foo-id'), 'hello')
 
65
 
 
66
    def test_revision_tree(self):
 
67
        b1 = Branch.initialize('.')
 
68
        b1.commit('lala!', rev_id='revision-1', allow_pointless=True)
 
69
        tree = b1.revision_tree('revision-1')
 
70
        tree = b1.revision_tree(None)
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
 
 
75
    def get_unbalanced_branch_pair(self):
 
76
        """Return two branches, a and b, with one file in a."""
 
77
        os.mkdir('a')
 
78
        br_a = Branch.initialize("a")
 
79
        file('a/b', 'wb').write('b')
 
80
        br_a.add('b')
 
81
        commit(br_a, "silly commit", rev_id='A')
 
82
        os.mkdir('b')
 
83
        br_b = Branch.initialize("b")
 
84
        return br_a, br_b
 
85
 
 
86
    def get_balanced_branch_pair(self):
 
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
89
        br_a.push_stores(br_b)
 
90
        return br_a, br_b
 
91
 
 
92
    def test_push_stores(self):
 
93
        """Copy the stores from one branch to another"""
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
95
        # ensure the revision is missing.
 
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
97
                          br_a.revision_history()[0])
 
98
        br_a.push_stores(br_b)
 
99
        # check that b now has all the data from a's first commit.
 
100
        rev = br_b.get_revision(br_a.revision_history()[0])
 
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
102
        for file_id in tree:
 
103
            if tree.inventory[file_id].kind == "file":
 
104
                tree.get_file(file_id).read()
 
105
        return br_a, br_b
 
106
 
 
107
    def test_copy_branch(self):
 
108
        """Copy the stores from one branch to another"""
 
109
        br_a, br_b = self.get_balanced_branch_pair()
 
110
        commit(br_b, "silly commit")
 
111
        os.mkdir('c')
 
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
114
 
 
115
    def test_copy_partial(self):
 
116
        """Copy only part of the history of a branch."""
 
117
        self.build_tree(['a/', 'a/one'])
 
118
        br_a = Branch.initialize('a')
 
119
        br_a.add(['one'])
 
120
        br_a.commit('commit one', rev_id='u@d-1')
 
121
        self.build_tree(['a/two'])
 
122
        br_a.add(['two'])
 
123
        br_a.commit('commit two', rev_id='u@d-2')
 
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
126
        self.assertTrue(os.path.exists('b/one'))
 
127
        self.assertFalse(os.path.exists('b/two'))
 
128
        
 
129
    def test_record_initial_ghost_merge(self):
 
130
        """A pending merge with no revision present is still a merge."""
 
131
        branch = Branch.initialize('.')
 
132
        branch.add_pending_merge('non:existent@rev--ision--0--2')
 
133
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
134
        rev = branch.get_revision(branch.last_revision())
 
135
        self.assertEqual(len(rev.parent_ids), 1)
 
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
 
137
        self.assertEqual(len(rev.parent_sha1s), 0)
 
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
 
139
 
 
140
    def test_bad_revision(self):
 
141
        branch = Branch.initialize('.')
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
 
143
 
 
144
# TODO 20051003 RBC:
 
145
# compare the gpg-to-sign info for a commit with a ghost and 
 
146
#     an identical tree without a ghost
 
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
148
        
 
149
    def test_pending_merges(self):
 
150
        """Tracking pending-merged revisions."""
 
151
        b = Branch.initialize('.')
 
152
 
 
153
        self.assertEquals(b.pending_merges(), [])
 
154
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
155
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
156
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
 
159
        self.assertEquals(b.pending_merges(),
 
160
                          ['foo@azkhazan-123123-abcabc',
 
161
                           'wibble@fofof--20050401--1928390812'])
 
162
        b.commit("commit from base with two merges")
 
163
        rev = b.get_revision(b.revision_history()[0])
 
164
        self.assertEquals(len(rev.parent_ids), 2)
 
165
        self.assertEquals(rev.parent_ids[0],
 
166
                          'foo@azkhazan-123123-abcabc')
 
167
        self.assertEquals(rev.parent_ids[1],
 
168
                           'wibble@fofof--20050401--1928390812')
 
169
        # list should be cleared when we do a commit
 
170
        self.assertEquals(b.pending_merges(), [])
 
171
 
 
172
    def test_sign_existing_revision(self):
 
173
        branch = Branch.initialize('.')
 
174
        branch.commit("base", allow_pointless=True, rev_id='A')
 
175
        from bzrlib.testament import Testament
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
178
                         branch.revision_store.get('A', 'sig').read())
 
179
 
 
180
    def test_store_signature(self):
 
181
        branch = Branch.initialize('.')
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
183
                                        'FOO', 'A')
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
185
 
 
186
    def test__relcontrolfilename(self):
 
187
        branch = Branch.initialize('.')
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
189
        
 
190
    def test__relcontrolfilename_empty(self):
 
191
        branch = Branch.initialize('.')
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
193
 
 
194
    def test_nicks(self):
 
195
        """Branch nicknames"""
 
196
        os.mkdir('bzr.dev')
 
197
        branch = Branch.initialize('bzr.dev')
 
198
        self.assertEqual(branch.nick, 'bzr.dev')
 
199
        os.rename('bzr.dev', 'bzr.ab')
 
200
        branch = Branch.open('bzr.ab')
 
201
        self.assertEqual(branch.nick, 'bzr.ab')
 
202
        branch.nick = "Aaron's branch"
 
203
        branch.nick = "Aaron's branch"
 
204
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
 
205
        self.assertEqual(branch.nick, "Aaron's branch")
 
206
        os.rename('bzr.ab', 'integration')
 
207
        branch = Branch.open('integration')
 
208
        self.assertEqual(branch.nick, "Aaron's branch")
 
209
        branch.nick = u"\u1234"
 
210
        self.assertEqual(branch.nick, u"\u1234")
 
211
 
 
212
 
 
213
class TestRemote(TestCaseWithWebserver):
 
214
 
 
215
    def test_open_containing(self):
 
216
        self.assertRaises(NotBranchError, Branch.open_containing,
 
217
                          self.get_remote_url(''))
 
218
        self.assertRaises(NotBranchError, Branch.open_containing,
 
219
                          self.get_remote_url('g/p/q'))
 
220
        b = Branch.initialize('.')
 
221
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
222
        self.assertEqual('', relpath)
 
223
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
224
        self.assertEqual('g/p/q', relpath)
 
225
        
 
226
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
227
#         >>> from bzrlib.commit import commit
 
228
#         >>> bzrlib.trace.silent = True
 
229
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
230
#         >>> br1.add('foo')
 
231
#         >>> br1.add('bar')
 
232
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
233
#         >>> br2 = ScratchBranch()
 
234
#         >>> br2.update_revisions(br1)
 
235
#         Added 2 texts.
 
236
#         Added 1 inventories.
 
237
#         Added 1 revisions.
 
238
#         >>> br2.revision_history()
 
239
#         [u'REVISION-ID-1']
 
240
#         >>> br2.update_revisions(br1)
 
241
#         Added 0 revisions.
 
242
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
243
#         True
 
244
 
 
245
class InstrumentedTransaction(object):
 
246
 
 
247
    def finish(self):
 
248
        self.calls.append('finish')
 
249
 
 
250
    def __init__(self):
 
251
        self.calls = []
 
252
 
 
253
 
 
254
class TestDecorator(object):
 
255
 
 
256
    def __init__(self):
 
257
        self._calls = []
 
258
 
 
259
    def lock_read(self):
 
260
        self._calls.append('lr')
 
261
 
 
262
    def lock_write(self):
 
263
        self._calls.append('lw')
 
264
 
 
265
    def unlock(self):
 
266
        self._calls.append('ul')
 
267
 
 
268
    @needs_read_lock
 
269
    def do_with_read(self):
 
270
        return 1
 
271
 
 
272
    @needs_read_lock
 
273
    def except_with_read(self):
 
274
        raise RuntimeError
 
275
 
 
276
    @needs_write_lock
 
277
    def do_with_write(self):
 
278
        return 2
 
279
 
 
280
    @needs_write_lock
 
281
    def except_with_write(self):
 
282
        raise RuntimeError
 
283
 
 
284
 
 
285
class TestDecorators(TestCase):
 
286
 
 
287
    def test_needs_read_lock(self):
 
288
        branch = TestDecorator()
 
289
        self.assertEqual(1, branch.do_with_read())
 
290
        self.assertEqual(['lr', 'ul'], branch._calls)
 
291
 
 
292
    def test_excepts_in_read_lock(self):
 
293
        branch = TestDecorator()
 
294
        self.assertRaises(RuntimeError, branch.except_with_read)
 
295
        self.assertEqual(['lr', 'ul'], branch._calls)
 
296
 
 
297
    def test_needs_write_lock(self):
 
298
        branch = TestDecorator()
 
299
        self.assertEqual(2, branch.do_with_write())
 
300
        self.assertEqual(['lw', 'ul'], branch._calls)
 
301
 
 
302
    def test_excepts_in_write_lock(self):
 
303
        branch = TestDecorator()
 
304
        self.assertRaises(RuntimeError, branch.except_with_write)
 
305
        self.assertEqual(['lw', 'ul'], branch._calls)
 
306
 
 
307
 
 
308
class TestBranchTransaction(TestCaseInTempDir):
 
309
 
 
310
    def setUp(self):
 
311
        super(TestBranchTransaction, self).setUp()
 
312
        self.branch = Branch.initialize('.')
 
313
        
 
314
    def test_default_get_transaction(self):
 
315
        """branch.get_transaction on a new branch should give a PassThrough."""
 
316
        self.failUnless(isinstance(self.branch.get_transaction(),
 
317
                                   transactions.PassThroughTransaction))
 
318
 
 
319
    def test__set_new_transaction(self):
 
320
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
321
 
 
322
    def test__set_over_existing_transaction_raises(self):
 
323
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
324
        self.assertRaises(errors.LockError,
 
325
                          self.branch._set_transaction,
 
326
                          transactions.ReadOnlyTransaction())
 
327
 
 
328
    def test_finish_no_transaction_raises(self):
 
329
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
330
 
 
331
    def test_finish_readonly_transaction_works(self):
 
332
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
333
        self.branch._finish_transaction()
 
334
        self.assertEqual(None, self.branch._transaction)
 
335
 
 
336
    def test_unlock_calls_finish(self):
 
337
        self.branch.lock_read()
 
338
        transaction = InstrumentedTransaction()
 
339
        self.branch._transaction = transaction
 
340
        self.branch.unlock()
 
341
        self.assertEqual(['finish'], transaction.calls)
 
342
 
 
343
    def test_lock_read_acquires_ro_transaction(self):
 
344
        self.branch.lock_read()
 
345
        self.failUnless(isinstance(self.branch.get_transaction(),
 
346
                                   transactions.ReadOnlyTransaction))
 
347
        self.branch.unlock()
 
348
        
 
349
    def test_lock_write_acquires_passthrough_transaction(self):
 
350
        self.branch.lock_write()
 
351
        # cannot use get_transaction as its magic
 
352
        self.failUnless(isinstance(self.branch._transaction,
 
353
                                   transactions.PassThroughTransaction))
 
354
        self.branch.unlock()
 
355
 
 
356
 
 
357
class TestBranchPushLocations(TestCaseInTempDir):
 
358
 
 
359
    def setUp(self):
 
360
        super(TestBranchPushLocations, self).setUp()
 
361
        self.branch = Branch.initialize('.')
 
362
        
 
363
    def test_get_push_location_unset(self):
 
364
        self.assertEqual(None, self.branch.get_push_location())
 
365
 
 
366
    def test_get_push_location_exact(self):
 
367
        self.build_tree(['.bazaar/'])
 
368
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
369
                                                       "push_location=foo" %
 
370
                                                       os.getcwdu())
 
371
        self.assertEqual("foo", self.branch.get_push_location())
 
372
 
 
373
    def test_set_push_location(self):
 
374
        self.branch.set_push_location('foo')
 
375
        self.assertFileEqual("[%s]\n"
 
376
                             "push_location = foo" % os.getcwdu(),
 
377
                             '.bazaar/branches.conf')
 
378
 
 
379
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
380
    # recursive section - that is, it appends the branch name.