/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: Robert Collins
  • Date: 2007-09-19 05:14:14 UTC
  • mto: (2835.1.1 ianc-integration)
  • mto: This revision was merged to the branch mainline in revision 2836.
  • Revision ID: robertc@robertcollins.net-20070919051414-2tgjqteg7k3ps4h0
* ``pull``, ``merge`` and ``push`` will no longer silently correct some
  repository index errors that occured as a result of the Weave disk format.
  Instead the ``reconcile`` command needs to be run to correct those
  problems if they exist (and it has been able to fix most such problems
  since bzr 0.8). Some new problems have been identified during this release
  and you should run ``bzr check`` once on every repository to see if you
  need to reconcile. If you cannot ``pull`` or ``merge`` from a remote
  repository due to mismatched parent errors - a symptom of index errors -
  you should simply take a full copy of that remote repository to a clean
  directory outside any local repositories, then run reconcile on it, and
  finally pull from it locally. (And naturally email the repositories owner
  to ask them to upgrade and run reconcile).
  (Robert Collins)

* ``VersionedFile.fix_parents`` has been removed as a harmful API.
  ``VersionedFile.join`` will no longer accept different parents on either
  side of a join - it will either ignore them, or error, depending on the
  implementation. See notes when upgrading for more information.
  (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the Repository facility that are not interface tests.
 
18
 
 
19
For interface tests see tests/repository_implementations/*.py.
 
20
 
 
21
For concrete class tests see this file, and for storage formats tests
 
22
also see this file.
 
23
"""
 
24
 
 
25
from stat import S_ISDIR
 
26
from StringIO import StringIO
 
27
 
 
28
from bzrlib import symbol_versioning
 
29
import bzrlib
 
30
import bzrlib.bzrdir as bzrdir
 
31
import bzrlib.errors as errors
 
32
from bzrlib.errors import (NotBranchError,
 
33
                           NoSuchFile,
 
34
                           UnknownFormatError,
 
35
                           UnsupportedFormatError,
 
36
                           )
 
37
from bzrlib.repository import RepositoryFormat
 
38
from bzrlib.tests import (
 
39
    TestCase,
 
40
    TestCaseWithTransport,
 
41
    test_knit,
 
42
    )
 
43
from bzrlib.transport import get_transport
 
44
from bzrlib.transport.memory import MemoryServer
 
45
from bzrlib.util import bencode
 
46
from bzrlib import (
 
47
    repository,
 
48
    upgrade,
 
49
    workingtree,
 
50
    )
 
51
from bzrlib.repofmt import knitrepo, weaverepo
 
52
 
 
53
 
 
54
class TestDefaultFormat(TestCase):
 
55
 
 
56
    def test_get_set_default_format(self):
 
57
        old_default = bzrdir.format_registry.get('default')
 
58
        private_default = old_default().repository_format.__class__
 
59
        old_format = repository.RepositoryFormat.get_default_format()
 
60
        self.assertTrue(isinstance(old_format, private_default))
 
61
        def make_sample_bzrdir():
 
62
            my_bzrdir = bzrdir.BzrDirMetaFormat1()
 
63
            my_bzrdir.repository_format = SampleRepositoryFormat()
 
64
            return my_bzrdir
 
65
        bzrdir.format_registry.remove('default')
 
66
        bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
 
67
        bzrdir.format_registry.set_default('sample')
 
68
        # creating a repository should now create an instrumented dir.
 
69
        try:
 
70
            # the default branch format is used by the meta dir format
 
71
            # which is not the default bzrdir format at this point
 
72
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
 
73
            result = dir.create_repository()
 
74
            self.assertEqual(result, 'A bzr repository dir')
 
75
        finally:
 
76
            bzrdir.format_registry.remove('default')
 
77
            bzrdir.format_registry.remove('sample')
 
78
            bzrdir.format_registry.register('default', old_default, '')
 
79
        self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
 
80
                              old_format.__class__)
 
81
 
 
82
 
 
83
class SampleRepositoryFormat(repository.RepositoryFormat):
 
84
    """A sample format
 
85
 
 
86
    this format is initializable, unsupported to aid in testing the 
 
87
    open and open(unsupported=True) routines.
 
88
    """
 
89
 
 
90
    def get_format_string(self):
 
91
        """See RepositoryFormat.get_format_string()."""
 
92
        return "Sample .bzr repository format."
 
93
 
 
94
    def initialize(self, a_bzrdir, shared=False):
 
95
        """Initialize a repository in a BzrDir"""
 
96
        t = a_bzrdir.get_repository_transport(self)
 
97
        t.put_bytes('format', self.get_format_string())
 
98
        return 'A bzr repository dir'
 
99
 
 
100
    def is_supported(self):
 
101
        return False
 
102
 
 
103
    def open(self, a_bzrdir, _found=False):
 
104
        return "opened repository."
 
105
 
 
106
 
 
107
class TestRepositoryFormat(TestCaseWithTransport):
 
108
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
 
109
 
 
110
    def test_find_format(self):
 
111
        # is the right format object found for a repository?
 
112
        # create a branch with a few known format objects.
 
113
        # this is not quite the same as 
 
114
        self.build_tree(["foo/", "bar/"])
 
115
        def check_format(format, url):
 
116
            dir = format._matchingbzrdir.initialize(url)
 
117
            format.initialize(dir)
 
118
            t = get_transport(url)
 
119
            found_format = repository.RepositoryFormat.find_format(dir)
 
120
            self.failUnless(isinstance(found_format, format.__class__))
 
121
        check_format(weaverepo.RepositoryFormat7(), "bar")
 
122
        
 
123
    def test_find_format_no_repository(self):
 
124
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
125
        self.assertRaises(errors.NoRepositoryPresent,
 
126
                          repository.RepositoryFormat.find_format,
 
127
                          dir)
 
128
 
 
129
    def test_find_format_unknown_format(self):
 
130
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
131
        SampleRepositoryFormat().initialize(dir)
 
132
        self.assertRaises(UnknownFormatError,
 
133
                          repository.RepositoryFormat.find_format,
 
134
                          dir)
 
135
 
 
136
    def test_register_unregister_format(self):
 
137
        format = SampleRepositoryFormat()
 
138
        # make a control dir
 
139
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
140
        # make a repo
 
141
        format.initialize(dir)
 
142
        # register a format for it.
 
143
        repository.RepositoryFormat.register_format(format)
 
144
        # which repository.Open will refuse (not supported)
 
145
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
 
146
        # but open(unsupported) will work
 
147
        self.assertEqual(format.open(dir), "opened repository.")
 
148
        # unregister the format
 
149
        repository.RepositoryFormat.unregister_format(format)
 
150
 
 
151
 
 
152
class TestFormat6(TestCaseWithTransport):
 
153
 
 
154
    def test_no_ancestry_weave(self):
 
155
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
156
        repo = weaverepo.RepositoryFormat6().initialize(control)
 
157
        # We no longer need to create the ancestry.weave file
 
158
        # since it is *never* used.
 
159
        self.assertRaises(NoSuchFile,
 
160
                          control.transport.get,
 
161
                          'ancestry.weave')
 
162
 
 
163
 
 
164
class TestFormat7(TestCaseWithTransport):
 
165
    
 
166
    def test_disk_layout(self):
 
167
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
168
        repo = weaverepo.RepositoryFormat7().initialize(control)
 
169
        # in case of side effects of locking.
 
170
        repo.lock_write()
 
171
        repo.unlock()
 
172
        # we want:
 
173
        # format 'Bazaar-NG Repository format 7'
 
174
        # lock ''
 
175
        # inventory.weave == empty_weave
 
176
        # empty revision-store directory
 
177
        # empty weaves directory
 
178
        t = control.get_repository_transport(None)
 
179
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
180
                             t.get('format').read())
 
181
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
182
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
183
        self.assertEqualDiff('# bzr weave file v5\n'
 
184
                             'w\n'
 
185
                             'W\n',
 
186
                             t.get('inventory.weave').read())
 
187
 
 
188
    def test_shared_disk_layout(self):
 
189
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
190
        repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
 
191
        # we want:
 
192
        # format 'Bazaar-NG Repository format 7'
 
193
        # inventory.weave == empty_weave
 
194
        # empty revision-store directory
 
195
        # empty weaves directory
 
196
        # a 'shared-storage' marker file.
 
197
        # lock is not present when unlocked
 
198
        t = control.get_repository_transport(None)
 
199
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
200
                             t.get('format').read())
 
201
        self.assertEqualDiff('', t.get('shared-storage').read())
 
202
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
203
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
204
        self.assertEqualDiff('# bzr weave file v5\n'
 
205
                             'w\n'
 
206
                             'W\n',
 
207
                             t.get('inventory.weave').read())
 
208
        self.assertFalse(t.has('branch-lock'))
 
209
 
 
210
    def test_creates_lockdir(self):
 
211
        """Make sure it appears to be controlled by a LockDir existence"""
 
212
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
213
        repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
 
214
        t = control.get_repository_transport(None)
 
215
        # TODO: Should check there is a 'lock' toplevel directory, 
 
216
        # regardless of contents
 
217
        self.assertFalse(t.has('lock/held/info'))
 
218
        repo.lock_write()
 
219
        try:
 
220
            self.assertTrue(t.has('lock/held/info'))
 
221
        finally:
 
222
            # unlock so we don't get a warning about failing to do so
 
223
            repo.unlock()
 
224
 
 
225
    def test_uses_lockdir(self):
 
226
        """repo format 7 actually locks on lockdir"""
 
227
        base_url = self.get_url()
 
228
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
 
229
        repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
 
230
        t = control.get_repository_transport(None)
 
231
        repo.lock_write()
 
232
        repo.unlock()
 
233
        del repo
 
234
        # make sure the same lock is created by opening it
 
235
        repo = repository.Repository.open(base_url)
 
236
        repo.lock_write()
 
237
        self.assertTrue(t.has('lock/held/info'))
 
238
        repo.unlock()
 
239
        self.assertFalse(t.has('lock/held/info'))
 
240
 
 
241
    def test_shared_no_tree_disk_layout(self):
 
242
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
243
        repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
 
244
        repo.set_make_working_trees(False)
 
245
        # we want:
 
246
        # format 'Bazaar-NG Repository format 7'
 
247
        # lock ''
 
248
        # inventory.weave == empty_weave
 
249
        # empty revision-store directory
 
250
        # empty weaves directory
 
251
        # a 'shared-storage' marker file.
 
252
        t = control.get_repository_transport(None)
 
253
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
254
                             t.get('format').read())
 
255
        ## self.assertEqualDiff('', t.get('lock').read())
 
256
        self.assertEqualDiff('', t.get('shared-storage').read())
 
257
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
258
        repo.set_make_working_trees(True)
 
259
        self.assertFalse(t.has('no-working-trees'))
 
260
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
261
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
262
        self.assertEqualDiff('# bzr weave file v5\n'
 
263
                             'w\n'
 
264
                             'W\n',
 
265
                             t.get('inventory.weave').read())
 
266
 
 
267
 
 
268
class TestFormatKnit1(TestCaseWithTransport):
 
269
    
 
270
    def test_disk_layout(self):
 
271
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
272
        repo = knitrepo.RepositoryFormatKnit1().initialize(control)
 
273
        # in case of side effects of locking.
 
274
        repo.lock_write()
 
275
        repo.unlock()
 
276
        # we want:
 
277
        # format 'Bazaar-NG Knit Repository Format 1'
 
278
        # lock: is a directory
 
279
        # inventory.weave == empty_weave
 
280
        # empty revision-store directory
 
281
        # empty weaves directory
 
282
        t = control.get_repository_transport(None)
 
283
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
284
                             t.get('format').read())
 
285
        # XXX: no locks left when unlocked at the moment
 
286
        # self.assertEqualDiff('', t.get('lock').read())
 
287
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
288
        self.check_knits(t)
 
289
 
 
290
    def assertHasKnit(self, t, knit_name):
 
291
        """Assert that knit_name exists on t."""
 
292
        self.assertEqualDiff('# bzr knit index 8\n',
 
293
                             t.get(knit_name + '.kndx').read())
 
294
        # no default content
 
295
        self.assertTrue(t.has(knit_name + '.knit'))
 
296
 
 
297
    def check_knits(self, t):
 
298
        """check knit content for a repository."""
 
299
        self.assertHasKnit(t, 'inventory')
 
300
        self.assertHasKnit(t, 'revisions')
 
301
        self.assertHasKnit(t, 'signatures')
 
302
 
 
303
    def test_shared_disk_layout(self):
 
304
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
305
        repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
 
306
        # we want:
 
307
        # format 'Bazaar-NG Knit Repository Format 1'
 
308
        # lock: is a directory
 
309
        # inventory.weave == empty_weave
 
310
        # empty revision-store directory
 
311
        # empty weaves directory
 
312
        # a 'shared-storage' marker file.
 
313
        t = control.get_repository_transport(None)
 
314
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
315
                             t.get('format').read())
 
316
        # XXX: no locks left when unlocked at the moment
 
317
        # self.assertEqualDiff('', t.get('lock').read())
 
318
        self.assertEqualDiff('', t.get('shared-storage').read())
 
319
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
320
        self.check_knits(t)
 
321
 
 
322
    def test_shared_no_tree_disk_layout(self):
 
323
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
324
        repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
 
325
        repo.set_make_working_trees(False)
 
326
        # we want:
 
327
        # format 'Bazaar-NG Knit Repository Format 1'
 
328
        # lock ''
 
329
        # inventory.weave == empty_weave
 
330
        # empty revision-store directory
 
331
        # empty weaves directory
 
332
        # a 'shared-storage' marker file.
 
333
        t = control.get_repository_transport(None)
 
334
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
335
                             t.get('format').read())
 
336
        # XXX: no locks left when unlocked at the moment
 
337
        # self.assertEqualDiff('', t.get('lock').read())
 
338
        self.assertEqualDiff('', t.get('shared-storage').read())
 
339
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
340
        repo.set_make_working_trees(True)
 
341
        self.assertFalse(t.has('no-working-trees'))
 
342
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
343
        self.check_knits(t)
 
344
 
 
345
 
 
346
class KnitRepositoryStreamTests(test_knit.KnitTests):
 
347
    """Tests for knitrepo._get_stream_as_bytes."""
 
348
 
 
349
    def test_get_stream_as_bytes(self):
 
350
        # Make a simple knit
 
351
        k1 = self.make_test_knit()
 
352
        k1.add_lines('text-a', [], test_knit.split_lines(test_knit.TEXT_1))
 
353
        
 
354
        # Serialise it, check the output.
 
355
        bytes = knitrepo._get_stream_as_bytes(k1, ['text-a'])
 
356
        data = bencode.bdecode(bytes)
 
357
        format, record = data
 
358
        self.assertEqual('knit-plain', format)
 
359
        self.assertEqual(['text-a', ['fulltext'], []], record[:3])
 
360
        self.assertRecordContentEqual(k1, 'text-a', record[3])
 
361
 
 
362
    def test_get_stream_as_bytes_all(self):
 
363
        """Get a serialised data stream for all the records in a knit.
 
364
 
 
365
        Much like test_get_stream_all, except for get_stream_as_bytes.
 
366
        """
 
367
        k1 = self.make_test_knit()
 
368
        # Insert the same data as BasicKnitTests.test_knit_join, as they seem
 
369
        # to cover a range of cases (no parents, one parent, multiple parents).
 
370
        test_data = [
 
371
            ('text-a', [], test_knit.TEXT_1),
 
372
            ('text-b', ['text-a'], test_knit.TEXT_1),
 
373
            ('text-c', [], test_knit.TEXT_1),
 
374
            ('text-d', ['text-c'], test_knit.TEXT_1),
 
375
            ('text-m', ['text-b', 'text-d'], test_knit.TEXT_1),
 
376
           ]
 
377
        expected_data_list = [
 
378
            # version, options, parents
 
379
            ('text-a', ['fulltext'], []),
 
380
            ('text-b', ['line-delta'], ['text-a']),
 
381
            ('text-c', ['fulltext'], []),
 
382
            ('text-d', ['line-delta'], ['text-c']),
 
383
            ('text-m', ['line-delta'], ['text-b', 'text-d']),
 
384
            ]
 
385
        for version_id, parents, lines in test_data:
 
386
            k1.add_lines(version_id, parents, test_knit.split_lines(lines))
 
387
 
 
388
        bytes = knitrepo._get_stream_as_bytes(
 
389
            k1, ['text-a', 'text-b', 'text-c', 'text-d', 'text-m'])
 
390
 
 
391
        data = bencode.bdecode(bytes)
 
392
        format = data.pop(0)
 
393
        self.assertEqual('knit-plain', format)
 
394
 
 
395
        for expected, actual in zip(expected_data_list, data):
 
396
            expected_version = expected[0]
 
397
            expected_options = expected[1]
 
398
            expected_parents = expected[2]
 
399
            version, options, parents, bytes = actual
 
400
            self.assertEqual(expected_version, version)
 
401
            self.assertEqual(expected_options, options)
 
402
            self.assertEqual(expected_parents, parents)
 
403
            self.assertRecordContentEqual(k1, version, bytes)
 
404
 
 
405
 
 
406
class DummyRepository(object):
 
407
    """A dummy repository for testing."""
 
408
 
 
409
    _serializer = None
 
410
 
 
411
    def supports_rich_root(self):
 
412
        return False
 
413
 
 
414
 
 
415
class InterDummy(repository.InterRepository):
 
416
    """An inter-repository optimised code path for DummyRepository.
 
417
 
 
418
    This is for use during testing where we use DummyRepository as repositories
 
419
    so that none of the default regsitered inter-repository classes will
 
420
    match.
 
421
    """
 
422
 
 
423
    @staticmethod
 
424
    def is_compatible(repo_source, repo_target):
 
425
        """InterDummy is compatible with DummyRepository."""
 
426
        return (isinstance(repo_source, DummyRepository) and 
 
427
            isinstance(repo_target, DummyRepository))
 
428
 
 
429
 
 
430
class TestInterRepository(TestCaseWithTransport):
 
431
 
 
432
    def test_get_default_inter_repository(self):
 
433
        # test that the InterRepository.get(repo_a, repo_b) probes
 
434
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
 
435
        # true and returns a default inter_repo otherwise.
 
436
        # This also tests that the default registered optimised interrepository
 
437
        # classes do not barf inappropriately when a surprising repository type
 
438
        # is handed to them.
 
439
        dummy_a = DummyRepository()
 
440
        dummy_b = DummyRepository()
 
441
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
442
 
 
443
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
 
444
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default.
 
445
        
 
446
        The effective default is now InterSameDataRepository because there is
 
447
        no actual sane default in the presence of incompatible data models.
 
448
        """
 
449
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
 
450
        self.assertEqual(repository.InterSameDataRepository,
 
451
                         inter_repo.__class__)
 
452
        self.assertEqual(repo_a, inter_repo.source)
 
453
        self.assertEqual(repo_b, inter_repo.target)
 
454
 
 
455
    def test_register_inter_repository_class(self):
 
456
        # test that a optimised code path provider - a
 
457
        # InterRepository subclass can be registered and unregistered
 
458
        # and that it is correctly selected when given a repository
 
459
        # pair that it returns true on for the is_compatible static method
 
460
        # check
 
461
        dummy_a = DummyRepository()
 
462
        dummy_b = DummyRepository()
 
463
        repo = self.make_repository('.')
 
464
        # hack dummies to look like repo somewhat.
 
465
        dummy_a._serializer = repo._serializer
 
466
        dummy_b._serializer = repo._serializer
 
467
        repository.InterRepository.register_optimiser(InterDummy)
 
468
        try:
 
469
            # we should get the default for something InterDummy returns False
 
470
            # to
 
471
            self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
 
472
            self.assertGetsDefaultInterRepository(dummy_a, repo)
 
473
            # and we should get an InterDummy for a pair it 'likes'
 
474
            self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
 
475
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
 
476
            self.assertEqual(InterDummy, inter_repo.__class__)
 
477
            self.assertEqual(dummy_a, inter_repo.source)
 
478
            self.assertEqual(dummy_b, inter_repo.target)
 
479
        finally:
 
480
            repository.InterRepository.unregister_optimiser(InterDummy)
 
481
        # now we should get the default InterRepository object again.
 
482
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
483
 
 
484
 
 
485
class TestInterWeaveRepo(TestCaseWithTransport):
 
486
 
 
487
    def test_is_compatible_and_registered(self):
 
488
        # InterWeaveRepo is compatible when either side
 
489
        # is a format 5/6/7 branch
 
490
        from bzrlib.repofmt import knitrepo, weaverepo
 
491
        formats = [weaverepo.RepositoryFormat5(),
 
492
                   weaverepo.RepositoryFormat6(),
 
493
                   weaverepo.RepositoryFormat7()]
 
494
        incompatible_formats = [weaverepo.RepositoryFormat4(),
 
495
                                knitrepo.RepositoryFormatKnit1(),
 
496
                                ]
 
497
        repo_a = self.make_repository('a')
 
498
        repo_b = self.make_repository('b')
 
499
        is_compatible = repository.InterWeaveRepo.is_compatible
 
500
        for source in incompatible_formats:
 
501
            # force incompatible left then right
 
502
            repo_a._format = source
 
503
            repo_b._format = formats[0]
 
504
            self.assertFalse(is_compatible(repo_a, repo_b))
 
505
            self.assertFalse(is_compatible(repo_b, repo_a))
 
506
        for source in formats:
 
507
            repo_a._format = source
 
508
            for target in formats:
 
509
                repo_b._format = target
 
510
                self.assertTrue(is_compatible(repo_a, repo_b))
 
511
        self.assertEqual(repository.InterWeaveRepo,
 
512
                         repository.InterRepository.get(repo_a,
 
513
                                                        repo_b).__class__)
 
514
 
 
515
 
 
516
class TestRepositoryConverter(TestCaseWithTransport):
 
517
 
 
518
    def test_convert_empty(self):
 
519
        t = get_transport(self.get_url('.'))
 
520
        t.mkdir('repository')
 
521
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
 
522
        repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
 
523
        target_format = knitrepo.RepositoryFormatKnit1()
 
524
        converter = repository.CopyConverter(target_format)
 
525
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
526
        try:
 
527
            converter.convert(repo, pb)
 
528
        finally:
 
529
            pb.finished()
 
530
        repo = repo_dir.open_repository()
 
531
        self.assertTrue(isinstance(target_format, repo._format.__class__))
 
532
 
 
533
 
 
534
class TestMisc(TestCase):
 
535
    
 
536
    def test_unescape_xml(self):
 
537
        """We get some kind of error when malformed entities are passed"""
 
538
        self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;') 
 
539
 
 
540
 
 
541
class TestRepositoryFormatKnit3(TestCaseWithTransport):
 
542
 
 
543
    def test_convert(self):
 
544
        """Ensure the upgrade adds weaves for roots"""
 
545
        format = bzrdir.BzrDirMetaFormat1()
 
546
        format.repository_format = knitrepo.RepositoryFormatKnit1()
 
547
        tree = self.make_branch_and_tree('.', format)
 
548
        tree.commit("Dull commit", rev_id="dull")
 
549
        revision_tree = tree.branch.repository.revision_tree('dull')
 
550
        self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
 
551
            revision_tree.inventory.root.file_id)
 
552
        format = bzrdir.BzrDirMetaFormat1()
 
553
        format.repository_format = knitrepo.RepositoryFormatKnit3()
 
554
        upgrade.Convert('.', format)
 
555
        tree = workingtree.WorkingTree.open('.')
 
556
        revision_tree = tree.branch.repository.revision_tree('dull')
 
557
        revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
 
558
        tree.commit("Another dull commit", rev_id='dull2')
 
559
        revision_tree = tree.branch.repository.revision_tree('dull2')
 
560
        self.assertEqual('dull', revision_tree.inventory.root.revision)
 
561