1
# Copyright (C) 2006, 2007 Canonical Ltd
 
 
3
# This program is free software; you can redistribute it and/or modify
 
 
4
# it under the terms of the GNU General Public License as published by
 
 
5
# the Free Software Foundation; either version 2 of the License, or
 
 
6
# (at your option) any later version.
 
 
8
# This program is distributed in the hope that it will be useful,
 
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
11
# GNU General Public License for more details.
 
 
13
# You should have received a copy of the GNU General Public License
 
 
14
# along with this program; if not, write to the Free Software
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
17
"""Tests for the Repository facility that are not interface tests.
 
 
19
For interface tests see tests/repository_implementations/*.py.
 
 
21
For concrete class tests see this file, and for storage formats tests
 
 
25
from stat import S_ISDIR
 
 
26
from StringIO import StringIO
 
 
28
from bzrlib import symbol_versioning
 
 
30
import bzrlib.bzrdir as bzrdir
 
 
31
import bzrlib.errors as errors
 
 
32
from bzrlib.errors import (NotBranchError,
 
 
35
                           UnsupportedFormatError,
 
 
37
from bzrlib.repository import RepositoryFormat
 
 
38
from bzrlib.tests import TestCase, TestCaseWithTransport
 
 
39
from bzrlib.transport import get_transport
 
 
40
from bzrlib.transport.memory import MemoryServer
 
 
46
from bzrlib.repofmt import knitrepo, weaverepo
 
 
49
class TestDefaultFormat(TestCase):
 
 
51
    def test_get_set_default_format(self):
 
 
52
        old_default = bzrdir.format_registry.get('default')
 
 
53
        private_default = old_default().repository_format.__class__
 
 
54
        old_format = repository.RepositoryFormat.get_default_format()
 
 
55
        self.assertTrue(isinstance(old_format, private_default))
 
 
56
        def make_sample_bzrdir():
 
 
57
            my_bzrdir = bzrdir.BzrDirMetaFormat1()
 
 
58
            my_bzrdir.repository_format = SampleRepositoryFormat()
 
 
60
        bzrdir.format_registry.remove('default')
 
 
61
        bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
 
 
62
        bzrdir.format_registry.set_default('sample')
 
 
63
        # creating a repository should now create an instrumented dir.
 
 
65
            # the default branch format is used by the meta dir format
 
 
66
            # which is not the default bzrdir format at this point
 
 
67
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
 
 
68
            result = dir.create_repository()
 
 
69
            self.assertEqual(result, 'A bzr repository dir')
 
 
71
            bzrdir.format_registry.remove('default')
 
 
72
            bzrdir.format_registry.remove('sample')
 
 
73
            bzrdir.format_registry.register('default', old_default, '')
 
 
74
        self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
 
 
78
class SampleRepositoryFormat(repository.RepositoryFormat):
 
 
81
    this format is initializable, unsupported to aid in testing the 
 
 
82
    open and open(unsupported=True) routines.
 
 
85
    def get_format_string(self):
 
 
86
        """See RepositoryFormat.get_format_string()."""
 
 
87
        return "Sample .bzr repository format."
 
 
89
    def initialize(self, a_bzrdir, shared=False):
 
 
90
        """Initialize a repository in a BzrDir"""
 
 
91
        t = a_bzrdir.get_repository_transport(self)
 
 
92
        t.put_bytes('format', self.get_format_string())
 
 
93
        return 'A bzr repository dir'
 
 
95
    def is_supported(self):
 
 
98
    def open(self, a_bzrdir, _found=False):
 
 
99
        return "opened repository."
 
 
102
class TestRepositoryFormat(TestCaseWithTransport):
 
 
103
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
 
 
105
    def test_find_format(self):
 
 
106
        # is the right format object found for a repository?
 
 
107
        # create a branch with a few known format objects.
 
 
108
        # this is not quite the same as 
 
 
109
        self.build_tree(["foo/", "bar/"])
 
 
110
        def check_format(format, url):
 
 
111
            dir = format._matchingbzrdir.initialize(url)
 
 
112
            format.initialize(dir)
 
 
113
            t = get_transport(url)
 
 
114
            found_format = repository.RepositoryFormat.find_format(dir)
 
 
115
            self.failUnless(isinstance(found_format, format.__class__))
 
 
116
        check_format(weaverepo.RepositoryFormat7(), "bar")
 
 
118
    def test_find_format_no_repository(self):
 
 
119
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
120
        self.assertRaises(errors.NoRepositoryPresent,
 
 
121
                          repository.RepositoryFormat.find_format,
 
 
124
    def test_find_format_unknown_format(self):
 
 
125
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
126
        SampleRepositoryFormat().initialize(dir)
 
 
127
        self.assertRaises(UnknownFormatError,
 
 
128
                          repository.RepositoryFormat.find_format,
 
 
131
    def test_register_unregister_format(self):
 
 
132
        format = SampleRepositoryFormat()
 
 
134
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
136
        format.initialize(dir)
 
 
137
        # register a format for it.
 
 
138
        repository.RepositoryFormat.register_format(format)
 
 
139
        # which repository.Open will refuse (not supported)
 
 
140
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
 
 
141
        # but open(unsupported) will work
 
 
142
        self.assertEqual(format.open(dir), "opened repository.")
 
 
143
        # unregister the format
 
 
144
        repository.RepositoryFormat.unregister_format(format)
 
 
147
class TestFormat6(TestCaseWithTransport):
 
 
149
    def test_no_ancestry_weave(self):
 
 
150
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
 
151
        repo = weaverepo.RepositoryFormat6().initialize(control)
 
 
152
        # We no longer need to create the ancestry.weave file
 
 
153
        # since it is *never* used.
 
 
154
        self.assertRaises(NoSuchFile,
 
 
155
                          control.transport.get,
 
 
159
class TestFormat7(TestCaseWithTransport):
 
 
161
    def test_disk_layout(self):
 
 
162
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
163
        repo = weaverepo.RepositoryFormat7().initialize(control)
 
 
164
        # in case of side effects of locking.
 
 
168
        # format 'Bazaar-NG Repository format 7'
 
 
170
        # inventory.weave == empty_weave
 
 
171
        # empty revision-store directory
 
 
172
        # empty weaves directory
 
 
173
        t = control.get_repository_transport(None)
 
 
174
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
 
175
                             t.get('format').read())
 
 
176
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
 
177
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
 
178
        self.assertEqualDiff('# bzr weave file v5\n'
 
 
181
                             t.get('inventory.weave').read())
 
 
183
    def test_shared_disk_layout(self):
 
 
184
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
185
        repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
 
 
187
        # format 'Bazaar-NG Repository format 7'
 
 
188
        # inventory.weave == empty_weave
 
 
189
        # empty revision-store directory
 
 
190
        # empty weaves directory
 
 
191
        # a 'shared-storage' marker file.
 
 
192
        # lock is not present when unlocked
 
 
193
        t = control.get_repository_transport(None)
 
 
194
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
 
195
                             t.get('format').read())
 
 
196
        self.assertEqualDiff('', t.get('shared-storage').read())
 
 
197
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
 
198
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
 
199
        self.assertEqualDiff('# bzr weave file v5\n'
 
 
202
                             t.get('inventory.weave').read())
 
 
203
        self.assertFalse(t.has('branch-lock'))
 
 
205
    def test_creates_lockdir(self):
 
 
206
        """Make sure it appears to be controlled by a LockDir existence"""
 
 
207
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
208
        repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
 
 
209
        t = control.get_repository_transport(None)
 
 
210
        # TODO: Should check there is a 'lock' toplevel directory, 
 
 
211
        # regardless of contents
 
 
212
        self.assertFalse(t.has('lock/held/info'))
 
 
215
            self.assertTrue(t.has('lock/held/info'))
 
 
217
            # unlock so we don't get a warning about failing to do so
 
 
220
    def test_uses_lockdir(self):
 
 
221
        """repo format 7 actually locks on lockdir"""
 
 
222
        base_url = self.get_url()
 
 
223
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
 
 
224
        repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
 
 
225
        t = control.get_repository_transport(None)
 
 
229
        # make sure the same lock is created by opening it
 
 
230
        repo = repository.Repository.open(base_url)
 
 
232
        self.assertTrue(t.has('lock/held/info'))
 
 
234
        self.assertFalse(t.has('lock/held/info'))
 
 
236
    def test_shared_no_tree_disk_layout(self):
 
 
237
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
238
        repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
 
 
239
        repo.set_make_working_trees(False)
 
 
241
        # format 'Bazaar-NG Repository format 7'
 
 
243
        # inventory.weave == empty_weave
 
 
244
        # empty revision-store directory
 
 
245
        # empty weaves directory
 
 
246
        # a 'shared-storage' marker file.
 
 
247
        t = control.get_repository_transport(None)
 
 
248
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
 
249
                             t.get('format').read())
 
 
250
        ## self.assertEqualDiff('', t.get('lock').read())
 
 
251
        self.assertEqualDiff('', t.get('shared-storage').read())
 
 
252
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
 
253
        repo.set_make_working_trees(True)
 
 
254
        self.assertFalse(t.has('no-working-trees'))
 
 
255
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
 
256
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
 
257
        self.assertEqualDiff('# bzr weave file v5\n'
 
 
260
                             t.get('inventory.weave').read())
 
 
263
class TestFormatKnit1(TestCaseWithTransport):
 
 
265
    def test_disk_layout(self):
 
 
266
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
267
        repo = knitrepo.RepositoryFormatKnit1().initialize(control)
 
 
268
        # in case of side effects of locking.
 
 
272
        # format 'Bazaar-NG Knit Repository Format 1'
 
 
273
        # lock: is a directory
 
 
274
        # inventory.weave == empty_weave
 
 
275
        # empty revision-store directory
 
 
276
        # empty weaves directory
 
 
277
        t = control.get_repository_transport(None)
 
 
278
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
 
279
                             t.get('format').read())
 
 
280
        # XXX: no locks left when unlocked at the moment
 
 
281
        # self.assertEqualDiff('', t.get('lock').read())
 
 
282
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
 
285
    def assertHasKnit(self, t, knit_name):
 
 
286
        """Assert that knit_name exists on t."""
 
 
287
        self.assertEqualDiff('# bzr knit index 8\n',
 
 
288
                             t.get(knit_name + '.kndx').read())
 
 
290
        self.assertTrue(t.has(knit_name + '.knit'))
 
 
292
    def check_knits(self, t):
 
 
293
        """check knit content for a repository."""
 
 
294
        self.assertHasKnit(t, 'inventory')
 
 
295
        self.assertHasKnit(t, 'revisions')
 
 
296
        self.assertHasKnit(t, 'signatures')
 
 
298
    def test_shared_disk_layout(self):
 
 
299
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
300
        repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
 
 
302
        # format 'Bazaar-NG Knit Repository Format 1'
 
 
303
        # lock: is a directory
 
 
304
        # inventory.weave == empty_weave
 
 
305
        # empty revision-store directory
 
 
306
        # empty weaves directory
 
 
307
        # a 'shared-storage' marker file.
 
 
308
        t = control.get_repository_transport(None)
 
 
309
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
 
310
                             t.get('format').read())
 
 
311
        # XXX: no locks left when unlocked at the moment
 
 
312
        # self.assertEqualDiff('', t.get('lock').read())
 
 
313
        self.assertEqualDiff('', t.get('shared-storage').read())
 
 
314
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
 
317
    def test_shared_no_tree_disk_layout(self):
 
 
318
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
 
319
        repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
 
 
320
        repo.set_make_working_trees(False)
 
 
322
        # format 'Bazaar-NG Knit Repository Format 1'
 
 
324
        # inventory.weave == empty_weave
 
 
325
        # empty revision-store directory
 
 
326
        # empty weaves directory
 
 
327
        # a 'shared-storage' marker file.
 
 
328
        t = control.get_repository_transport(None)
 
 
329
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
 
330
                             t.get('format').read())
 
 
331
        # XXX: no locks left when unlocked at the moment
 
 
332
        # self.assertEqualDiff('', t.get('lock').read())
 
 
333
        self.assertEqualDiff('', t.get('shared-storage').read())
 
 
334
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
 
335
        repo.set_make_working_trees(True)
 
 
336
        self.assertFalse(t.has('no-working-trees'))
 
 
337
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
 
341
class DummyRepository(object):
 
 
342
    """A dummy repository for testing."""
 
 
346
    def supports_rich_root(self):
 
 
350
class InterDummy(repository.InterRepository):
 
 
351
    """An inter-repository optimised code path for DummyRepository.
 
 
353
    This is for use during testing where we use DummyRepository as repositories
 
 
354
    so that none of the default regsitered inter-repository classes will
 
 
359
    def is_compatible(repo_source, repo_target):
 
 
360
        """InterDummy is compatible with DummyRepository."""
 
 
361
        return (isinstance(repo_source, DummyRepository) and 
 
 
362
            isinstance(repo_target, DummyRepository))
 
 
365
class TestInterRepository(TestCaseWithTransport):
 
 
367
    def test_get_default_inter_repository(self):
 
 
368
        # test that the InterRepository.get(repo_a, repo_b) probes
 
 
369
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
 
 
370
        # true and returns a default inter_repo otherwise.
 
 
371
        # This also tests that the default registered optimised interrepository
 
 
372
        # classes do not barf inappropriately when a surprising repository type
 
 
374
        dummy_a = DummyRepository()
 
 
375
        dummy_b = DummyRepository()
 
 
376
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
 
378
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
 
 
379
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default.
 
 
381
        The effective default is now InterSameDataRepository because there is
 
 
382
        no actual sane default in the presence of incompatible data models.
 
 
384
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
 
 
385
        self.assertEqual(repository.InterSameDataRepository,
 
 
386
                         inter_repo.__class__)
 
 
387
        self.assertEqual(repo_a, inter_repo.source)
 
 
388
        self.assertEqual(repo_b, inter_repo.target)
 
 
390
    def test_register_inter_repository_class(self):
 
 
391
        # test that a optimised code path provider - a
 
 
392
        # InterRepository subclass can be registered and unregistered
 
 
393
        # and that it is correctly selected when given a repository
 
 
394
        # pair that it returns true on for the is_compatible static method
 
 
396
        dummy_a = DummyRepository()
 
 
397
        dummy_b = DummyRepository()
 
 
398
        repo = self.make_repository('.')
 
 
399
        # hack dummies to look like repo somewhat.
 
 
400
        dummy_a._serializer = repo._serializer
 
 
401
        dummy_b._serializer = repo._serializer
 
 
402
        repository.InterRepository.register_optimiser(InterDummy)
 
 
404
            # we should get the default for something InterDummy returns False
 
 
406
            self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
 
 
407
            self.assertGetsDefaultInterRepository(dummy_a, repo)
 
 
408
            # and we should get an InterDummy for a pair it 'likes'
 
 
409
            self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
 
 
410
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
 
 
411
            self.assertEqual(InterDummy, inter_repo.__class__)
 
 
412
            self.assertEqual(dummy_a, inter_repo.source)
 
 
413
            self.assertEqual(dummy_b, inter_repo.target)
 
 
415
            repository.InterRepository.unregister_optimiser(InterDummy)
 
 
416
        # now we should get the default InterRepository object again.
 
 
417
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
 
420
class TestInterWeaveRepo(TestCaseWithTransport):
 
 
422
    def test_is_compatible_and_registered(self):
 
 
423
        # InterWeaveRepo is compatible when either side
 
 
424
        # is a format 5/6/7 branch
 
 
425
        from bzrlib.repofmt import knitrepo, weaverepo
 
 
426
        formats = [weaverepo.RepositoryFormat5(),
 
 
427
                   weaverepo.RepositoryFormat6(),
 
 
428
                   weaverepo.RepositoryFormat7()]
 
 
429
        incompatible_formats = [weaverepo.RepositoryFormat4(),
 
 
430
                                knitrepo.RepositoryFormatKnit1(),
 
 
432
        repo_a = self.make_repository('a')
 
 
433
        repo_b = self.make_repository('b')
 
 
434
        is_compatible = repository.InterWeaveRepo.is_compatible
 
 
435
        for source in incompatible_formats:
 
 
436
            # force incompatible left then right
 
 
437
            repo_a._format = source
 
 
438
            repo_b._format = formats[0]
 
 
439
            self.assertFalse(is_compatible(repo_a, repo_b))
 
 
440
            self.assertFalse(is_compatible(repo_b, repo_a))
 
 
441
        for source in formats:
 
 
442
            repo_a._format = source
 
 
443
            for target in formats:
 
 
444
                repo_b._format = target
 
 
445
                self.assertTrue(is_compatible(repo_a, repo_b))
 
 
446
        self.assertEqual(repository.InterWeaveRepo,
 
 
447
                         repository.InterRepository.get(repo_a,
 
 
451
class TestRepositoryConverter(TestCaseWithTransport):
 
 
453
    def test_convert_empty(self):
 
 
454
        t = get_transport(self.get_url('.'))
 
 
455
        t.mkdir('repository')
 
 
456
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
 
 
457
        repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
 
 
458
        target_format = knitrepo.RepositoryFormatKnit1()
 
 
459
        converter = repository.CopyConverter(target_format)
 
 
460
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
 
462
            converter.convert(repo, pb)
 
 
465
        repo = repo_dir.open_repository()
 
 
466
        self.assertTrue(isinstance(target_format, repo._format.__class__))
 
 
469
class TestMisc(TestCase):
 
 
471
    def test_unescape_xml(self):
 
 
472
        """We get some kind of error when malformed entities are passed"""
 
 
473
        self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;') 
 
 
476
class TestRepositoryFormatKnit3(TestCaseWithTransport):
 
 
478
    def test_convert(self):
 
 
479
        """Ensure the upgrade adds weaves for roots"""
 
 
480
        format = bzrdir.BzrDirMetaFormat1()
 
 
481
        format.repository_format = knitrepo.RepositoryFormatKnit1()
 
 
482
        tree = self.make_branch_and_tree('.', format)
 
 
483
        tree.commit("Dull commit", rev_id="dull")
 
 
484
        revision_tree = tree.branch.repository.revision_tree('dull')
 
 
485
        self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
 
 
486
            revision_tree.inventory.root.file_id)
 
 
487
        format = bzrdir.BzrDirMetaFormat1()
 
 
488
        format.repository_format = knitrepo.RepositoryFormatKnit3()
 
 
489
        upgrade.Convert('.', format)
 
 
490
        tree = workingtree.WorkingTree.open('.')
 
 
491
        revision_tree = tree.branch.repository.revision_tree('dull')
 
 
492
        revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
 
 
493
        tree.commit("Another dull commit", rev_id='dull2')
 
 
494
        revision_tree = tree.branch.repository.revision_tree('dull2')
 
 
495
        self.assertEqual('dull', revision_tree.inventory.root.revision)