/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-12-21 06:03:07 UTC
  • mfrom: (4665.7.3 serve-init)
  • Revision ID: pqm@pqm.ubuntu.com-20091221060307-uvja3vdy1o6dzzy0
(mbp) example debian init script

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
    smart,
37
37
    tests,
38
38
    urlutils,
 
39
    versionedfile,
39
40
    )
40
41
from bzrlib.branch import Branch, BranchReferenceFormat
41
42
import bzrlib.smart.branch
42
43
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
43
44
import bzrlib.smart.packrepository
44
45
import bzrlib.smart.repository
 
46
import bzrlib.smart.vfs
45
47
from bzrlib.smart.request import (
46
48
    FailedSmartServerResponse,
47
49
    SmartServerRequest,
51
53
from bzrlib.tests import (
52
54
    split_suite_by_re,
53
55
    )
54
 
from bzrlib.transport import chroot, get_transport
 
56
from bzrlib.transport import chroot, get_transport, local, memory
55
57
 
56
58
 
57
59
def load_tests(standard_tests, module, loader):
80
82
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
81
83
 
82
84
    def setUp(self):
 
85
        self.vfs_transport_factory = memory.MemoryServer
83
86
        tests.TestCaseWithTransport.setUp(self)
84
87
        self._chroot_server = None
85
88
 
94
97
        return t
95
98
 
96
99
 
97
 
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
 
100
class TestCaseWithSmartMedium(tests.TestCaseWithMemoryTransport):
98
101
 
99
102
    def setUp(self):
100
103
        super(TestCaseWithSmartMedium, self).setUp()
112
115
        return self.get_transport().get_smart_medium()
113
116
 
114
117
 
 
118
class TestByteStreamToStream(tests.TestCase):
 
119
 
 
120
    def test_repeated_substreams_same_kind_are_one_stream(self):
 
121
        # Make a stream - an iterable of bytestrings.
 
122
        stream = [('text', [versionedfile.FulltextContentFactory(('k1',), None,
 
123
            None, 'foo')]),('text', [
 
124
            versionedfile.FulltextContentFactory(('k2',), None, None, 'bar')])]
 
125
        fmt = bzrdir.format_registry.get('pack-0.92')().repository_format
 
126
        bytes = smart.repository._stream_to_byte_stream(stream, fmt)
 
127
        streams = []
 
128
        # Iterate the resulting iterable; checking that we get only one stream
 
129
        # out.
 
130
        fmt, stream = smart.repository._byte_stream_to_stream(bytes)
 
131
        for kind, substream in stream:
 
132
            streams.append((kind, list(substream)))
 
133
        self.assertLength(1, streams)
 
134
        self.assertLength(2, streams[0][1])
 
135
 
 
136
 
115
137
class TestSmartServerResponse(tests.TestCase):
116
138
 
117
139
    def test__eq__(self):
149
171
        self.assertRaises(
150
172
            errors.PathNotChild, request.translate_client_path, 'bar/')
151
173
        self.assertEqual('./baz', request.translate_client_path('foo/baz'))
 
174
        e_acute = u'\N{LATIN SMALL LETTER E WITH ACUTE}'.encode('utf-8')
 
175
        self.assertEqual('./' + urlutils.escape(e_acute),
 
176
                         request.translate_client_path('foo/' + e_acute))
 
177
 
 
178
    def test_translate_client_path_vfs(self):
 
179
        """VfsRequests receive escaped paths rather than raw UTF-8."""
 
180
        transport = self.get_transport()
 
181
        request = smart.vfs.VfsRequest(transport, 'foo/')
 
182
        e_acute = u'\N{LATIN SMALL LETTER E WITH ACUTE}'.encode('utf-8')
 
183
        escaped = urlutils.escape('foo/' + e_acute)
 
184
        self.assertEqual('./' + urlutils.escape(e_acute),
 
185
                         request.translate_client_path(escaped))
152
186
 
153
187
    def test_transport_from_client_path(self):
154
188
        transport = self.get_transport()
393
427
            'False', 'False', 'False', '', '', '', '', 'False')
394
428
 
395
429
 
 
430
class TestSmartServerRequestOpenBzrDir(tests.TestCaseWithMemoryTransport):
 
431
    
 
432
    def test_no_directory(self):
 
433
        backing = self.get_transport()
 
434
        request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
 
435
        self.assertEqual(SmartServerResponse(('no', )),
 
436
            request.execute('does-not-exist'))
 
437
 
 
438
    def test_empty_directory(self):
 
439
        backing = self.get_transport()
 
440
        backing.mkdir('empty')
 
441
        request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
 
442
        self.assertEqual(SmartServerResponse(('no', )),
 
443
            request.execute('empty'))
 
444
 
 
445
    def test_outside_root_client_path(self):
 
446
        backing = self.get_transport()
 
447
        request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing,
 
448
            root_client_path='root')
 
449
        self.assertEqual(SmartServerResponse(('no', )),
 
450
            request.execute('not-root'))
 
451
 
 
452
    
 
453
class TestSmartServerRequestOpenBzrDir_2_1(tests.TestCaseWithMemoryTransport):
 
454
    
 
455
    def test_no_directory(self):
 
456
        backing = self.get_transport()
 
457
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
 
458
        self.assertEqual(SmartServerResponse(('no', )),
 
459
            request.execute('does-not-exist'))
 
460
 
 
461
    def test_empty_directory(self):
 
462
        backing = self.get_transport()
 
463
        backing.mkdir('empty')
 
464
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
 
465
        self.assertEqual(SmartServerResponse(('no', )),
 
466
            request.execute('empty'))
 
467
 
 
468
    def test_present_without_workingtree(self):
 
469
        backing = self.get_transport()
 
470
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
 
471
        self.make_bzrdir('.')
 
472
        self.assertEqual(SmartServerResponse(('yes', 'no')),
 
473
            request.execute(''))
 
474
 
 
475
    def test_outside_root_client_path(self):
 
476
        backing = self.get_transport()
 
477
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing,
 
478
            root_client_path='root')
 
479
        self.assertEqual(SmartServerResponse(('no',)),
 
480
            request.execute('not-root'))
 
481
 
 
482
    
 
483
class TestSmartServerRequestOpenBzrDir_2_1_disk(TestCaseWithChrootedTransport):
 
484
 
 
485
    def test_present_with_workingtree(self):
 
486
        self.vfs_transport_factory = local.LocalURLServer
 
487
        backing = self.get_transport()
 
488
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
 
489
        bd = self.make_bzrdir('.')
 
490
        bd.create_repository()
 
491
        bd.create_branch()
 
492
        bd.create_workingtree()
 
493
        self.assertEqual(SmartServerResponse(('yes', 'yes')),
 
494
            request.execute(''))
 
495
 
 
496
 
396
497
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
397
498
 
398
499
    def test_no_branch(self):
413
514
 
414
515
    def test_branch_reference(self):
415
516
        """When there is a branch reference, the reference URL is returned."""
 
517
        self.vfs_transport_factory = local.LocalURLServer
416
518
        backing = self.get_transport()
417
519
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
418
520
        branch = self.make_branch('branch')
443
545
 
444
546
    def test_branch_reference(self):
445
547
        """When there is a branch reference, the reference URL is returned."""
 
548
        self.vfs_transport_factory = local.LocalURLServer
446
549
        backing = self.get_transport()
447
550
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
448
551
        branch = self.make_branch('branch')
1600
1703
        self.assertEqual(SmartServerResponse(('ok',)), response)
1601
1704
 
1602
1705
 
 
1706
class TestSmartServerVfsGet(tests.TestCaseWithMemoryTransport):
 
1707
 
 
1708
    def test_unicode_path(self):
 
1709
        """VFS requests expect unicode paths to be escaped."""
 
1710
        filename = u'foo\N{INTERROBANG}'
 
1711
        filename_escaped = urlutils.escape(filename)
 
1712
        backing = self.get_transport()
 
1713
        request = smart.vfs.GetRequest(backing)
 
1714
        backing.put_bytes_non_atomic(filename_escaped, 'contents')
 
1715
        self.assertEqual(SmartServerResponse(('ok', ), 'contents'),
 
1716
            request.execute(filename_escaped))
 
1717
 
 
1718
 
1603
1719
class TestHandlers(tests.TestCase):
1604
1720
    """Tests for the request.request_handlers object."""
1605
1721