/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: Michael Ellerman
  • Date: 2006-03-09 00:24:48 UTC
  • mto: (1610.1.8 bzr.mbp.integration)
  • mto: This revision was merged to the branch mainline in revision 1616.
  • Revision ID: michael@ellerman.id.au-20060309002448-70cce15e3d605130
Make the "ignore line" in the commit message editor the "right" width, so
that if you make your message that wide it won't wrap in bzr log output.
Just as a visual aid.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# (C) 2006 Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the Repository facility that are not interface tests.
 
18
 
 
19
For interface tests see tests/repository_implementations/*.py.
 
20
 
 
21
For concrete class tests see this file, and for storage formats tests
 
22
also see this file.
 
23
"""
 
24
 
 
25
from stat import *
 
26
from StringIO import StringIO
 
27
 
 
28
import bzrlib
 
29
import bzrlib.bzrdir as bzrdir
 
30
import bzrlib.errors as errors
 
31
from bzrlib.errors import (NotBranchError,
 
32
                           NoSuchFile,
 
33
                           UnknownFormatError,
 
34
                           UnsupportedFormatError,
 
35
                           )
 
36
import bzrlib.repository as repository
 
37
from bzrlib.tests import TestCase, TestCaseWithTransport
 
38
from bzrlib.transport import get_transport
 
39
from bzrlib.transport.http import HttpServer
 
40
from bzrlib.transport.memory import MemoryServer
 
41
 
 
42
 
 
43
class TestDefaultFormat(TestCase):
 
44
 
 
45
    def test_get_set_default_format(self):
 
46
        old_format = repository.RepositoryFormat.get_default_format()
 
47
        # default is None - we cannot create a Repository independently yet
 
48
        self.assertTrue(isinstance(old_format, repository.RepositoryFormat7))
 
49
        repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
 
50
        # creating a repository should now create an instrumented dir.
 
51
        try:
 
52
            # the default branch format is used by the meta dir format
 
53
            # which is not the default bzrdir format at this point
 
54
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:/')
 
55
            result = dir.create_repository()
 
56
            self.assertEqual(result, 'A bzr repository dir')
 
57
        finally:
 
58
            repository.RepositoryFormat.set_default_format(old_format)
 
59
        self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
 
60
 
 
61
 
 
62
class SampleRepositoryFormat(repository.RepositoryFormat):
 
63
    """A sample format
 
64
 
 
65
    this format is initializable, unsupported to aid in testing the 
 
66
    open and open(unsupported=True) routines.
 
67
    """
 
68
 
 
69
    def get_format_string(self):
 
70
        """See RepositoryFormat.get_format_string()."""
 
71
        return "Sample .bzr repository format."
 
72
 
 
73
    def initialize(self, a_bzrdir, shared=False):
 
74
        """Initialize a repository in a BzrDir"""
 
75
        t = a_bzrdir.get_repository_transport(self)
 
76
        t.put('format', StringIO(self.get_format_string()))
 
77
        return 'A bzr repository dir'
 
78
 
 
79
    def is_supported(self):
 
80
        return False
 
81
 
 
82
    def open(self, a_bzrdir, _found=False):
 
83
        return "opened repository."
 
84
 
 
85
 
 
86
class TestRepositoryFormat(TestCaseWithTransport):
 
87
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
 
88
 
 
89
    def test_find_format(self):
 
90
        # is the right format object found for a repository?
 
91
        # create a branch with a few known format objects.
 
92
        # this is not quite the same as 
 
93
        self.build_tree(["foo/", "bar/"])
 
94
        def check_format(format, url):
 
95
            dir = format._matchingbzrdir.initialize(url)
 
96
            format.initialize(dir)
 
97
            t = get_transport(url)
 
98
            found_format = repository.RepositoryFormat.find_format(dir)
 
99
            self.failUnless(isinstance(found_format, format.__class__))
 
100
        check_format(repository.RepositoryFormat7(), "bar")
 
101
        
 
102
    def test_find_format_no_repository(self):
 
103
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
104
        self.assertRaises(errors.NoRepositoryPresent,
 
105
                          repository.RepositoryFormat.find_format,
 
106
                          dir)
 
107
 
 
108
    def test_find_format_unknown_format(self):
 
109
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
110
        SampleRepositoryFormat().initialize(dir)
 
111
        self.assertRaises(UnknownFormatError,
 
112
                          repository.RepositoryFormat.find_format,
 
113
                          dir)
 
114
 
 
115
    def test_register_unregister_format(self):
 
116
        format = SampleRepositoryFormat()
 
117
        # make a control dir
 
118
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
119
        # make a repo
 
120
        format.initialize(dir)
 
121
        # register a format for it.
 
122
        repository.RepositoryFormat.register_format(format)
 
123
        # which repository.Open will refuse (not supported)
 
124
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
 
125
        # but open(unsupported) will work
 
126
        self.assertEqual(format.open(dir), "opened repository.")
 
127
        # unregister the format
 
128
        repository.RepositoryFormat.unregister_format(format)
 
129
 
 
130
 
 
131
class TestFormat6(TestCaseWithTransport):
 
132
 
 
133
    def test_no_ancestry_weave(self):
 
134
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
135
        repo = repository.RepositoryFormat6().initialize(control)
 
136
        # We no longer need to create the ancestry.weave file
 
137
        # since it is *never* used.
 
138
        self.assertRaises(NoSuchFile,
 
139
                          control.transport.get,
 
140
                          'ancestry.weave')
 
141
 
 
142
 
 
143
class TestFormat7(TestCaseWithTransport):
 
144
    
 
145
    def test_disk_layout(self):
 
146
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
147
        repo = repository.RepositoryFormat7().initialize(control)
 
148
        # in case of side effects of locking.
 
149
        repo.lock_write()
 
150
        repo.unlock()
 
151
        # we want:
 
152
        # format 'Bazaar-NG Repository format 7'
 
153
        # lock ''
 
154
        # inventory.weave == empty_weave
 
155
        # empty revision-store directory
 
156
        # empty weaves directory
 
157
        t = control.get_repository_transport(None)
 
158
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
159
                             t.get('format').read())
 
160
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
161
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
162
        self.assertEqualDiff('# bzr weave file v5\n'
 
163
                             'w\n'
 
164
                             'W\n',
 
165
                             t.get('inventory.weave').read())
 
166
 
 
167
    def test_shared_disk_layout(self):
 
168
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
169
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
170
        # we want:
 
171
        # format 'Bazaar-NG Repository format 7'
 
172
        # inventory.weave == empty_weave
 
173
        # empty revision-store directory
 
174
        # empty weaves directory
 
175
        # a 'shared-storage' marker file.
 
176
        # lock is not present when unlocked
 
177
        t = control.get_repository_transport(None)
 
178
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
179
                             t.get('format').read())
 
180
        self.assertEqualDiff('', t.get('shared-storage').read())
 
181
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
182
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
183
        self.assertEqualDiff('# bzr weave file v5\n'
 
184
                             'w\n'
 
185
                             'W\n',
 
186
                             t.get('inventory.weave').read())
 
187
        self.assertFalse(t.has('branch-lock'))
 
188
 
 
189
    def test_creates_lockdir(self):
 
190
        """Make sure it appears to be controlled by a LockDir existence"""
 
191
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
192
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
193
        t = control.get_repository_transport(None)
 
194
        # TODO: Should check there is a 'lock' toplevel directory, 
 
195
        # regardless of contents
 
196
        self.assertFalse(t.has('lock/held/info'))
 
197
        repo.lock_write()
 
198
        self.assertTrue(t.has('lock/held/info'))
 
199
 
 
200
    def test_uses_lockdir(self):
 
201
        """repo format 7 actually locks on lockdir"""
 
202
        base_url = self.get_url()
 
203
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
 
204
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
205
        t = control.get_repository_transport(None)
 
206
        repo.lock_write()
 
207
        repo.unlock()
 
208
        del repo
 
209
        # make sure the same lock is created by opening it
 
210
        repo = repository.Repository.open(base_url)
 
211
        repo.lock_write()
 
212
        self.assertTrue(t.has('lock/held/info'))
 
213
        repo.unlock()
 
214
        self.assertFalse(t.has('lock/held/info'))
 
215
 
 
216
    def test_shared_no_tree_disk_layout(self):
 
217
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
218
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
219
        repo.set_make_working_trees(False)
 
220
        # we want:
 
221
        # format 'Bazaar-NG Repository format 7'
 
222
        # lock ''
 
223
        # inventory.weave == empty_weave
 
224
        # empty revision-store directory
 
225
        # empty weaves directory
 
226
        # a 'shared-storage' marker file.
 
227
        t = control.get_repository_transport(None)
 
228
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
229
                             t.get('format').read())
 
230
        ## self.assertEqualDiff('', t.get('lock').read())
 
231
        self.assertEqualDiff('', t.get('shared-storage').read())
 
232
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
233
        repo.set_make_working_trees(True)
 
234
        self.assertFalse(t.has('no-working-trees'))
 
235
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
236
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
237
        self.assertEqualDiff('# bzr weave file v5\n'
 
238
                             'w\n'
 
239
                             'W\n',
 
240
                             t.get('inventory.weave').read())
 
241
 
 
242
 
 
243
class TestFormatKnit1(TestCaseWithTransport):
 
244
    
 
245
    def test_disk_layout(self):
 
246
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
247
        repo = repository.RepositoryFormatKnit1().initialize(control)
 
248
        # in case of side effects of locking.
 
249
        repo.lock_write()
 
250
        repo.unlock()
 
251
        # we want:
 
252
        # format 'Bazaar-NG Knit Repository Format 1'
 
253
        # lock: is a directory
 
254
        # inventory.weave == empty_weave
 
255
        # empty revision-store directory
 
256
        # empty weaves directory
 
257
        t = control.get_repository_transport(None)
 
258
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
259
                             t.get('format').read())
 
260
        # XXX: no locks left when unlocked at the moment
 
261
        # self.assertEqualDiff('', t.get('lock').read())
 
262
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
263
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
264
        self.assertTrue(S_ISDIR(t.stat('lock').st_mode))
 
265
        # cheating and using a weave for now.
 
266
        self.assertEqualDiff('# bzr weave file v5\n'
 
267
                             'w\n'
 
268
                             'W\n',
 
269
                             t.get('inventory.weave').read())
 
270
 
 
271
    def test_shared_disk_layout(self):
 
272
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
273
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
 
274
        # we want:
 
275
        # format 'Bazaar-NG Knit Repository Format 1'
 
276
        # lock: is a directory
 
277
        # inventory.weave == empty_weave
 
278
        # empty revision-store directory
 
279
        # empty weaves directory
 
280
        # a 'shared-storage' marker file.
 
281
        t = control.get_repository_transport(None)
 
282
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
283
                             t.get('format').read())
 
284
        # XXX: no locks left when unlocked at the moment
 
285
        # self.assertEqualDiff('', t.get('lock').read())
 
286
        self.assertEqualDiff('', t.get('shared-storage').read())
 
287
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
288
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
289
        self.assertTrue(S_ISDIR(t.stat('lock').st_mode))
 
290
        # cheating and using a weave for now.
 
291
        self.assertEqualDiff('# bzr weave file v5\n'
 
292
                             'w\n'
 
293
                             'W\n',
 
294
                             t.get('inventory.weave').read())
 
295
 
 
296
    def test_shared_no_tree_disk_layout(self):
 
297
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
298
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
 
299
        repo.set_make_working_trees(False)
 
300
        # we want:
 
301
        # format 'Bazaar-NG Knit Repository Format 1'
 
302
        # lock ''
 
303
        # inventory.weave == empty_weave
 
304
        # empty revision-store directory
 
305
        # empty weaves directory
 
306
        # a 'shared-storage' marker file.
 
307
        t = control.get_repository_transport(None)
 
308
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
309
                             t.get('format').read())
 
310
        # XXX: no locks left when unlocked at the moment
 
311
        # self.assertEqualDiff('', t.get('lock').read())
 
312
        self.assertEqualDiff('', t.get('shared-storage').read())
 
313
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
314
        repo.set_make_working_trees(True)
 
315
        self.assertFalse(t.has('no-working-trees'))
 
316
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
317
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
318
        self.assertTrue(S_ISDIR(t.stat('lock').st_mode))
 
319
        # cheating and using a weave for now.
 
320
        self.assertEqualDiff('# bzr weave file v5\n'
 
321
                             'w\n'
 
322
                             'W\n',
 
323
                             t.get('inventory.weave').read())
 
324
 
 
325
 
 
326
class InterString(repository.InterRepository):
 
327
    """An inter-repository optimised code path for strings.
 
328
 
 
329
    This is for use during testing where we use strings as repositories
 
330
    so that none of the default regsitered inter-repository classes will
 
331
    match.
 
332
    """
 
333
 
 
334
    @staticmethod
 
335
    def is_compatible(repo_source, repo_target):
 
336
        """InterString is compatible with strings-as-repos."""
 
337
        return isinstance(repo_source, str) and isinstance(repo_target, str)
 
338
 
 
339
 
 
340
class TestInterRepository(TestCaseWithTransport):
 
341
 
 
342
    def test_get_default_inter_repository(self):
 
343
        # test that the InterRepository.get(repo_a, repo_b) probes
 
344
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
 
345
        # true and returns a default inter_repo otherwise.
 
346
        # This also tests that the default registered optimised interrepository
 
347
        # classes do not barf inappropriately when a surprising repository type
 
348
        # is handed to them.
 
349
        dummy_a = "Repository 1."
 
350
        dummy_b = "Repository 2."
 
351
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
352
 
 
353
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
 
354
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
 
355
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
 
356
        self.assertEqual(repository.InterRepository,
 
357
                         inter_repo.__class__)
 
358
        self.assertEqual(repo_a, inter_repo.source)
 
359
        self.assertEqual(repo_b, inter_repo.target)
 
360
 
 
361
    def test_register_inter_repository_class(self):
 
362
        # test that a optimised code path provider - a
 
363
        # InterRepository subclass can be registered and unregistered
 
364
        # and that it is correctly selected when given a repository
 
365
        # pair that it returns true on for the is_compatible static method
 
366
        # check
 
367
        dummy_a = "Repository 1."
 
368
        dummy_b = "Repository 2."
 
369
        repository.InterRepository.register_optimiser(InterString)
 
370
        try:
 
371
            # we should get the default for something InterString returns False
 
372
            # to
 
373
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
374
            self.assertGetsDefaultInterRepository(dummy_a, None)
 
375
            # and we should get an InterString for a pair it 'likes'
 
376
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
377
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
 
378
            self.assertEqual(InterString, inter_repo.__class__)
 
379
            self.assertEqual(dummy_a, inter_repo.source)
 
380
            self.assertEqual(dummy_b, inter_repo.target)
 
381
        finally:
 
382
            repository.InterRepository.unregister_optimiser(InterString)
 
383
        # now we should get the default InterRepository object again.
 
384
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
385
 
 
386
 
 
387
class TestInterWeaveRepo(TestCaseWithTransport):
 
388
 
 
389
    def test_is_compatible_and_registered(self):
 
390
        # InterWeaveRepo is compatible when either side
 
391
        # is a format 5/6/7 branch
 
392
        formats = [repository.RepositoryFormat5(),
 
393
                   repository.RepositoryFormat6(),
 
394
                   repository.RepositoryFormat7()]
 
395
        incompatible_formats = [repository.RepositoryFormat4(),
 
396
                                repository.RepositoryFormatKnit1(),
 
397
                                ]
 
398
        repo_a = self.make_repository('a')
 
399
        repo_b = self.make_repository('b')
 
400
        is_compatible = repository.InterWeaveRepo.is_compatible
 
401
        for source in incompatible_formats:
 
402
            # force incompatible left then right
 
403
            repo_a._format = source
 
404
            repo_b._format = formats[0]
 
405
            self.assertFalse(is_compatible(repo_a, repo_b))
 
406
            self.assertFalse(is_compatible(repo_b, repo_a))
 
407
        for source in formats:
 
408
            repo_a._format = source
 
409
            for target in formats:
 
410
                repo_b._format = target
 
411
                self.assertTrue(is_compatible(repo_a, repo_b))
 
412
        self.assertEqual(repository.InterWeaveRepo,
 
413
                         repository.InterRepository.get(repo_a,
 
414
                                                        repo_b).__class__)
 
415
 
 
416
 
 
417
class TestRepositoryConverter(TestCaseWithTransport):
 
418
 
 
419
    def test_convert_empty(self):
 
420
        t = get_transport(self.get_url('.'))
 
421
        t.mkdir('repository')
 
422
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
 
423
        repo = repository.RepositoryFormat7().initialize(repo_dir)
 
424
        target_format = repository.RepositoryFormatKnit1()
 
425
        pb = bzrlib.ui.ui_factory.progress_bar()
 
426
 
 
427
        converter = repository.CopyConverter(target_format)
 
428
        converter.convert(repo, pb)
 
429
        repo = repo_dir.open_repository()
 
430
        self.assertTrue(isinstance(target_format, repo._format.__class__))
 
431