/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Pool
  • Date: 2009-07-27 06:28:35 UTC
  • mto: This revision was merged to the branch mainline in revision 4587.
  • Revision ID: mbp@sourcefrog.net-20090727062835-o66p8it658tq1sma
Add CountedLock.get_physical_lock_status

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
 
import errno
19
 
import os
20
 
import subprocess
21
 
import sys
22
 
import threading
 
18
from cStringIO import StringIO
23
19
 
24
 
from .. import (
 
20
import bzrlib
 
21
from bzrlib import (
25
22
    errors,
26
23
    osutils,
27
 
    tests,
28
 
    transport,
29
24
    urlutils,
30
25
    )
31
 
from ..sixish import (
32
 
    BytesIO,
33
 
    )
34
 
from ..transport import (
35
 
    chroot,
36
 
    fakenfs,
37
 
    http,
38
 
    local,
39
 
    memory,
40
 
    pathfilter,
41
 
    readonly,
42
 
    )
43
 
import breezy.transport.trace
44
 
from . import (
45
 
    features,
46
 
    test_server,
47
 
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
 
40
                              _get_transport_modules,
 
41
                              get_transport,
 
42
                              LateReadError,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              Transport,
 
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
 
48
from bzrlib.transport.memory import MemoryTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
48
51
 
49
52
 
50
53
# TODO: Should possibly split transport-specific tests into their own files.
51
54
 
52
55
 
53
 
class TestTransport(tests.TestCase):
 
56
class TestTransport(TestCase):
54
57
    """Test the non transport-concrete class functionality."""
55
58
 
56
59
    def test__get_set_protocol_handlers(self):
57
 
        handlers = transport._get_protocol_handlers()
58
 
        self.assertNotEqual([], handlers.keys())
59
 
        transport._clear_protocol_handlers()
60
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
61
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
60
        handlers = _get_protocol_handlers()
 
61
        self.assertNotEqual([], handlers.keys( ))
 
62
        try:
 
63
            _clear_protocol_handlers()
 
64
            self.assertEqual([], _get_protocol_handlers().keys())
 
65
        finally:
 
66
            _set_protocol_handlers(handlers)
62
67
 
63
68
    def test_get_transport_modules(self):
64
 
        handlers = transport._get_protocol_handlers()
65
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
69
        handlers = _get_protocol_handlers()
66
70
        # don't pollute the current handlers
67
 
        transport._clear_protocol_handlers()
68
 
 
 
71
        _clear_protocol_handlers()
69
72
        class SampleHandler(object):
70
73
            """I exist, isnt that enough?"""
71
 
        transport._clear_protocol_handlers()
72
 
        transport.register_transport_proto('foo')
73
 
        transport.register_lazy_transport('foo',
74
 
                                          'breezy.tests.test_transport',
75
 
                                          'TestTransport.SampleHandler')
76
 
        transport.register_transport_proto('bar')
77
 
        transport.register_lazy_transport('bar',
78
 
                                          'breezy.tests.test_transport',
79
 
                                          'TestTransport.SampleHandler')
80
 
        self.assertEqual([SampleHandler.__module__,
81
 
                          'breezy.transport.chroot',
82
 
                          'breezy.transport.pathfilter'],
83
 
                         transport._get_transport_modules())
 
74
        try:
 
75
            _clear_protocol_handlers()
 
76
            register_transport_proto('foo')
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
78
                                    'TestTransport.SampleHandler')
 
79
            register_transport_proto('bar')
 
80
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
81
                                    'TestTransport.SampleHandler')
 
82
            self.assertEqual([SampleHandler.__module__,
 
83
                              'bzrlib.transport.chroot'],
 
84
                             _get_transport_modules())
 
85
        finally:
 
86
            _set_protocol_handlers(handlers)
84
87
 
85
88
    def test_transport_dependency(self):
86
89
        """Transport with missing dependency causes no error"""
87
 
        saved_handlers = transport._get_protocol_handlers()
88
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
90
        saved_handlers = _get_protocol_handlers()
89
91
        # don't pollute the current handlers
90
 
        transport._clear_protocol_handlers()
91
 
        transport.register_transport_proto('foo')
92
 
        transport.register_lazy_transport(
93
 
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
 
92
        _clear_protocol_handlers()
94
93
        try:
95
 
            transport.get_transport_from_url('foo://fooserver/foo')
96
 
        except errors.UnsupportedProtocol as e:
97
 
            self.assertEqual('Unsupported protocol'
98
 
                             ' for url "foo://fooserver/foo":'
99
 
                             ' Unable to import library "some_lib":'
100
 
                             ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
 
94
            register_transport_proto('foo')
 
95
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
96
                    'BadTransportHandler')
 
97
            try:
 
98
                get_transport('foo://fooserver/foo')
 
99
            except UnsupportedProtocol, e:
 
100
                e_str = str(e)
 
101
                self.assertEquals('Unsupported protocol'
 
102
                                  ' for url "foo://fooserver/foo":'
 
103
                                  ' Unable to import library "some_lib":'
 
104
                                  ' testing missing dependency', str(e))
 
105
            else:
 
106
                self.fail('Did not raise UnsupportedProtocol')
 
107
        finally:
 
108
            # restore original values
 
109
            _set_protocol_handlers(saved_handlers)
103
110
 
104
111
    def test_transport_fallback(self):
105
112
        """Transport with missing dependency causes no error"""
106
 
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
113
        saved_handlers = _get_protocol_handlers()
 
114
        try:
 
115
            _clear_protocol_handlers()
 
116
            register_transport_proto('foo')
 
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
118
                    'BackupTransportHandler')
 
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
120
                    'BadTransportHandler')
 
121
            t = get_transport('foo://fooserver/foo')
 
122
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
123
        finally:
 
124
            _set_protocol_handlers(saved_handlers)
116
125
 
117
126
    def test_ssh_hints(self):
118
127
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
128
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
121
 
        except errors.UnsupportedProtocol as e:
122
 
            self.assertEqual(
123
 
                'Unsupported protocol'
124
 
                ' for url "ssh://fooserver/foo":'
125
 
                ' Use bzr+ssh for Bazaar operations over SSH, '
126
 
                'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
127
 
                'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
128
 
                str(e))
 
129
            get_transport('ssh://fooserver/foo')
 
130
        except UnsupportedProtocol, e:
 
131
            e_str = str(e)
 
132
            self.assertEquals('Unsupported protocol'
 
133
                              ' for url "ssh://fooserver/foo":'
 
134
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
135
                              str(e))
129
136
        else:
130
137
            self.fail('Did not raise UnsupportedProtocol')
131
138
 
132
139
    def test_LateReadError(self):
133
140
        """The LateReadError helper should raise on read()."""
134
 
        a_file = transport.LateReadError('a path')
 
141
        a_file = LateReadError('a path')
135
142
        try:
136
143
            a_file.read()
137
 
        except errors.ReadError as error:
 
144
        except ReadError, error:
138
145
            self.assertEqual('a path', error.path)
139
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
146
        self.assertRaises(ReadError, a_file.read, 40)
140
147
        a_file.close()
141
148
 
 
149
    def test__combine_paths(self):
 
150
        t = Transport('/')
 
151
        self.assertEqual('/home/sarah/project/foo',
 
152
                         t._combine_paths('/home/sarah', 'project/foo'))
 
153
        self.assertEqual('/etc',
 
154
                         t._combine_paths('/home/sarah', '../../etc'))
 
155
        self.assertEqual('/etc',
 
156
                         t._combine_paths('/home/sarah', '../../../etc'))
 
157
        self.assertEqual('/etc',
 
158
                         t._combine_paths('/home/sarah', '/etc'))
 
159
 
142
160
    def test_local_abspath_non_local_transport(self):
143
161
        # the base implementation should throw
144
 
        t = memory.MemoryTransport()
 
162
        t = MemoryTransport()
145
163
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
146
164
        self.assertEqual('memory:///t is not a local path.', str(e))
147
165
 
148
166
 
149
 
class TestCoalesceOffsets(tests.TestCase):
 
167
class TestCoalesceOffsets(TestCase):
150
168
 
151
169
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
152
 
        coalesce = transport.Transport._coalesce_offsets
153
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
170
        coalesce = Transport._coalesce_offsets
 
171
        exp = [_CoalescedOffset(*x) for x in expected]
154
172
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
155
173
                            max_size=max_size))
156
174
        self.assertEqual(exp, out)
164
182
    def test_coalesce_unrelated(self):
165
183
        self.check([(0, 10, [(0, 10)]),
166
184
                    (20, 10, [(0, 10)]),
167
 
                    ], [(0, 10), (20, 10)])
 
185
                   ], [(0, 10), (20, 10)])
168
186
 
169
187
    def test_coalesce_unsorted(self):
170
188
        self.check([(20, 10, [(0, 10)]),
171
189
                    (0, 10, [(0, 10)]),
172
 
                    ], [(20, 10), (0, 10)])
 
190
                   ], [(20, 10), (0, 10)])
173
191
 
174
192
    def test_coalesce_nearby(self):
175
193
        self.check([(0, 20, [(0, 10), (10, 10)])],
177
195
 
178
196
    def test_coalesce_overlapped(self):
179
197
        self.assertRaises(ValueError,
180
 
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
181
 
                          [(0, 10), (5, 10)])
 
198
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
199
                        [(0, 10), (5, 10)])
182
200
 
183
201
    def test_coalesce_limit(self):
184
202
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
185
203
                              (30, 10), (40, 10)]),
186
204
                    (60, 50, [(0, 10), (10, 10), (20, 10),
187
205
                              (30, 10), (40, 10)]),
188
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
189
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
190
 
                        (90, 10), (100, 10)],
191
 
                   limit=5)
 
206
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
207
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
208
                       (90, 10), (100, 10)],
 
209
                    limit=5)
192
210
 
193
211
    def test_coalesce_no_limit(self):
194
212
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
195
213
                               (30, 10), (40, 10), (50, 10),
196
214
                               (60, 10), (70, 10), (80, 10),
197
215
                               (90, 10)]),
198
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
199
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
200
 
                        (90, 10), (100, 10)])
 
216
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
217
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
218
                       (90, 10), (100, 10)])
201
219
 
202
220
    def test_coalesce_fudge(self):
203
221
        self.check([(10, 30, [(0, 10), (20, 10)]),
204
 
                    (100, 10, [(0, 10)]),
205
 
                    ], [(10, 10), (30, 10), (100, 10)],
206
 
                   fudge=10)
207
 
 
 
222
                    (100, 10, [(0, 10),]),
 
223
                   ], [(10, 10), (30, 10), (100, 10)],
 
224
                   fudge=10
 
225
                  )
208
226
    def test_coalesce_max_size(self):
209
227
        self.check([(10, 20, [(0, 10), (10, 10)]),
210
228
                    (30, 50, [(0, 50)]),
211
229
                    # If one range is above max_size, it gets its own coalesced
212
230
                    # offset
213
 
                    (100, 80, [(0, 80)]), ],
 
231
                    (100, 80, [(0, 80),]),],
214
232
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
215
 
                   max_size=50)
 
233
                   max_size=50
 
234
                  )
216
235
 
217
236
    def test_coalesce_no_max_size(self):
218
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
237
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
219
238
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
220
 
                   )
 
239
                  )
221
240
 
222
241
    def test_coalesce_default_limit(self):
223
242
        # By default we use a 100MB max size.
224
 
        ten_mb = 10 * 1024 * 1024
225
 
        self.check(
226
 
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
227
 
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
228
 
            [(i * ten_mb, ten_mb) for i in range(11)])
229
 
        self.check(
230
 
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
 
            [(i * ten_mb, ten_mb) for i in range(11)],
232
 
            max_size=1 * 1024 * 1024 * 1024)
233
 
 
234
 
 
235
 
class TestMemoryServer(tests.TestCase):
236
 
 
237
 
    def test_create_server(self):
238
 
        server = memory.MemoryServer()
239
 
        server.start_server()
240
 
        url = server.get_url()
241
 
        self.assertTrue(url in transport.transport_list_registry)
242
 
        t = transport.get_transport_from_url(url)
243
 
        del t
244
 
        server.stop_server()
245
 
        self.assertFalse(url in transport.transport_list_registry)
246
 
        self.assertRaises(errors.UnsupportedProtocol,
247
 
                          transport.get_transport, url)
248
 
 
249
 
 
250
 
class TestMemoryTransport(tests.TestCase):
 
243
        ten_mb = 10*1024*1024
 
244
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
245
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
246
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
247
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
248
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
249
                   max_size=1*1024*1024*1024)
 
250
 
 
251
 
 
252
class TestMemoryTransport(TestCase):
251
253
 
252
254
    def test_get_transport(self):
253
 
        memory.MemoryTransport()
 
255
        MemoryTransport()
254
256
 
255
257
    def test_clone(self):
256
 
        t = memory.MemoryTransport()
257
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
258
 
        self.assertEqual("memory:///", t.clone("/").base)
 
258
        transport = MemoryTransport()
 
259
        self.assertTrue(isinstance(transport, MemoryTransport))
 
260
        self.assertEqual("memory:///", transport.clone("/").base)
259
261
 
260
262
    def test_abspath(self):
261
 
        t = memory.MemoryTransport()
262
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
263
        transport = MemoryTransport()
 
264
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
263
265
 
264
266
    def test_abspath_of_root(self):
265
 
        t = memory.MemoryTransport()
266
 
        self.assertEqual("memory:///", t.base)
267
 
        self.assertEqual("memory:///", t.abspath('/'))
 
267
        transport = MemoryTransport()
 
268
        self.assertEqual("memory:///", transport.base)
 
269
        self.assertEqual("memory:///", transport.abspath('/'))
268
270
 
269
271
    def test_abspath_of_relpath_starting_at_root(self):
270
 
        t = memory.MemoryTransport()
271
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
272
        transport = MemoryTransport()
 
273
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
272
274
 
273
275
    def test_append_and_get(self):
274
 
        t = memory.MemoryTransport()
275
 
        t.append_bytes('path', b'content')
276
 
        self.assertEqual(t.get('path').read(), b'content')
277
 
        t.append_file('path', BytesIO(b'content'))
278
 
        with t.get('path') as f:
279
 
            self.assertEqual(f.read(), b'contentcontent')
 
276
        transport = MemoryTransport()
 
277
        transport.append_bytes('path', 'content')
 
278
        self.assertEqual(transport.get('path').read(), 'content')
 
279
        transport.append_file('path', StringIO('content'))
 
280
        self.assertEqual(transport.get('path').read(), 'contentcontent')
280
281
 
281
282
    def test_put_and_get(self):
282
 
        t = memory.MemoryTransport()
283
 
        t.put_file('path', BytesIO(b'content'))
284
 
        self.assertEqual(t.get('path').read(), b'content')
285
 
        t.put_bytes('path', b'content')
286
 
        self.assertEqual(t.get('path').read(), b'content')
 
283
        transport = MemoryTransport()
 
284
        transport.put_file('path', StringIO('content'))
 
285
        self.assertEqual(transport.get('path').read(), 'content')
 
286
        transport.put_bytes('path', 'content')
 
287
        self.assertEqual(transport.get('path').read(), 'content')
287
288
 
288
289
    def test_append_without_dir_fails(self):
289
 
        t = memory.MemoryTransport()
290
 
        self.assertRaises(errors.NoSuchFile,
291
 
                          t.append_bytes, 'dir/path', b'content')
 
290
        transport = MemoryTransport()
 
291
        self.assertRaises(NoSuchFile,
 
292
                          transport.append_bytes, 'dir/path', 'content')
292
293
 
293
294
    def test_put_without_dir_fails(self):
294
 
        t = memory.MemoryTransport()
295
 
        self.assertRaises(errors.NoSuchFile,
296
 
                          t.put_file, 'dir/path', BytesIO(b'content'))
 
295
        transport = MemoryTransport()
 
296
        self.assertRaises(NoSuchFile,
 
297
                          transport.put_file, 'dir/path', StringIO('content'))
297
298
 
298
299
    def test_get_missing(self):
299
 
        transport = memory.MemoryTransport()
300
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
300
        transport = MemoryTransport()
 
301
        self.assertRaises(NoSuchFile, transport.get, 'foo')
301
302
 
302
303
    def test_has_missing(self):
303
 
        t = memory.MemoryTransport()
304
 
        self.assertEqual(False, t.has('foo'))
 
304
        transport = MemoryTransport()
 
305
        self.assertEquals(False, transport.has('foo'))
305
306
 
306
307
    def test_has_present(self):
307
 
        t = memory.MemoryTransport()
308
 
        t.append_bytes('foo', b'content')
309
 
        self.assertEqual(True, t.has('foo'))
 
308
        transport = MemoryTransport()
 
309
        transport.append_bytes('foo', 'content')
 
310
        self.assertEquals(True, transport.has('foo'))
310
311
 
311
312
    def test_list_dir(self):
312
 
        t = memory.MemoryTransport()
313
 
        t.put_bytes('foo', b'content')
314
 
        t.mkdir('dir')
315
 
        t.put_bytes('dir/subfoo', b'content')
316
 
        t.put_bytes('dirlike', b'content')
 
313
        transport = MemoryTransport()
 
314
        transport.put_bytes('foo', 'content')
 
315
        transport.mkdir('dir')
 
316
        transport.put_bytes('dir/subfoo', 'content')
 
317
        transport.put_bytes('dirlike', 'content')
317
318
 
318
 
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
319
 
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
 
319
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
320
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
320
321
 
321
322
    def test_mkdir(self):
322
 
        t = memory.MemoryTransport()
323
 
        t.mkdir('dir')
324
 
        t.append_bytes('dir/path', b'content')
325
 
        with t.get('dir/path') as f:
326
 
            self.assertEqual(f.read(), b'content')
 
323
        transport = MemoryTransport()
 
324
        transport.mkdir('dir')
 
325
        transport.append_bytes('dir/path', 'content')
 
326
        self.assertEqual(transport.get('dir/path').read(), 'content')
327
327
 
328
328
    def test_mkdir_missing_parent(self):
329
 
        t = memory.MemoryTransport()
330
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
329
        transport = MemoryTransport()
 
330
        self.assertRaises(NoSuchFile,
 
331
                          transport.mkdir, 'dir/dir')
331
332
 
332
333
    def test_mkdir_twice(self):
333
 
        t = memory.MemoryTransport()
334
 
        t.mkdir('dir')
335
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
334
        transport = MemoryTransport()
 
335
        transport.mkdir('dir')
 
336
        self.assertRaises(FileExists, transport.mkdir, 'dir')
336
337
 
337
338
    def test_parameters(self):
338
 
        t = memory.MemoryTransport()
339
 
        self.assertEqual(True, t.listable())
340
 
        self.assertEqual(False, t.is_readonly())
 
339
        transport = MemoryTransport()
 
340
        self.assertEqual(True, transport.listable())
 
341
        self.assertEqual(False, transport.is_readonly())
341
342
 
342
343
    def test_iter_files_recursive(self):
343
 
        t = memory.MemoryTransport()
344
 
        t.mkdir('dir')
345
 
        t.put_bytes('dir/foo', b'content')
346
 
        t.put_bytes('dir/bar', b'content')
347
 
        t.put_bytes('bar', b'content')
348
 
        paths = set(t.iter_files_recursive())
349
 
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
 
344
        transport = MemoryTransport()
 
345
        transport.mkdir('dir')
 
346
        transport.put_bytes('dir/foo', 'content')
 
347
        transport.put_bytes('dir/bar', 'content')
 
348
        transport.put_bytes('bar', 'content')
 
349
        paths = set(transport.iter_files_recursive())
 
350
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
350
351
 
351
352
    def test_stat(self):
352
 
        t = memory.MemoryTransport()
353
 
        t.put_bytes('foo', b'content')
354
 
        t.put_bytes('bar', b'phowar')
355
 
        self.assertEqual(7, t.stat('foo').st_size)
356
 
        self.assertEqual(6, t.stat('bar').st_size)
357
 
 
358
 
 
359
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
353
        transport = MemoryTransport()
 
354
        transport.put_bytes('foo', 'content')
 
355
        transport.put_bytes('bar', 'phowar')
 
356
        self.assertEqual(7, transport.stat('foo').st_size)
 
357
        self.assertEqual(6, transport.stat('bar').st_size)
 
358
 
 
359
 
 
360
class ChrootDecoratorTransportTest(TestCase):
360
361
    """Chroot decoration specific tests."""
361
362
 
362
363
    def test_abspath(self):
363
364
        # The abspath is always relative to the chroot_url.
364
 
        server = chroot.ChrootServer(
365
 
            transport.get_transport_from_url('memory:///foo/bar/'))
366
 
        self.start_server(server)
367
 
        t = transport.get_transport_from_url(server.get_url())
368
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
365
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
366
        server.setUp()
 
367
        transport = get_transport(server.get_url())
 
368
        self.assertEqual(server.get_url(), transport.abspath('/'))
369
369
 
370
 
        subdir_t = t.clone('subdir')
371
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
370
        subdir_transport = transport.clone('subdir')
 
371
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
372
        server.tearDown()
372
373
 
373
374
    def test_clone(self):
374
 
        server = chroot.ChrootServer(
375
 
            transport.get_transport_from_url('memory:///foo/bar/'))
376
 
        self.start_server(server)
377
 
        t = transport.get_transport_from_url(server.get_url())
 
375
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
376
        server.setUp()
 
377
        transport = get_transport(server.get_url())
378
378
        # relpath from root and root path are the same
379
 
        relpath_cloned = t.clone('foo')
380
 
        abspath_cloned = t.clone('/foo')
 
379
        relpath_cloned = transport.clone('foo')
 
380
        abspath_cloned = transport.clone('/foo')
381
381
        self.assertEqual(server, relpath_cloned.server)
382
382
        self.assertEqual(server, abspath_cloned.server)
 
383
        server.tearDown()
383
384
 
384
385
    def test_chroot_url_preserves_chroot(self):
385
386
        """Calling get_transport on a chroot transport's base should produce a
389
390
        This is so that it is not possible to escape a chroot by doing::
390
391
            url = chroot_transport.base
391
392
            parent_url = urlutils.join(url, '..')
392
 
            new_t = transport.get_transport_from_url(parent_url)
 
393
            new_transport = get_transport(parent_url)
393
394
        """
394
 
        server = chroot.ChrootServer(
395
 
            transport.get_transport_from_url('memory:///path/subpath'))
396
 
        self.start_server(server)
397
 
        t = transport.get_transport_from_url(server.get_url())
398
 
        new_t = transport.get_transport_from_url(t.base)
399
 
        self.assertEqual(t.server, new_t.server)
400
 
        self.assertEqual(t.base, new_t.base)
 
395
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
396
        server.setUp()
 
397
        transport = get_transport(server.get_url())
 
398
        new_transport = get_transport(transport.base)
 
399
        self.assertEqual(transport.server, new_transport.server)
 
400
        self.assertEqual(transport.base, new_transport.base)
 
401
        server.tearDown()
401
402
 
402
403
    def test_urljoin_preserves_chroot(self):
403
404
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
406
407
        This is so that it is not possible to escape a chroot by doing::
407
408
            url = chroot_transport.base
408
409
            parent_url = urlutils.join(url, '..')
409
 
            new_t = transport.get_transport_from_url(parent_url)
 
410
            new_transport = get_transport(parent_url)
410
411
        """
411
 
        server = chroot.ChrootServer(
412
 
            transport.get_transport_from_url('memory:///path/'))
413
 
        self.start_server(server)
414
 
        t = transport.get_transport_from_url(server.get_url())
 
412
        server = ChrootServer(get_transport('memory:///path/'))
 
413
        server.setUp()
 
414
        transport = get_transport(server.get_url())
415
415
        self.assertRaises(
416
 
            urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
417
 
 
418
 
 
419
 
class TestChrootServer(tests.TestCase):
 
416
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
417
        server.tearDown()
 
418
 
 
419
 
 
420
class ChrootServerTest(TestCase):
420
421
 
421
422
    def test_construct(self):
422
 
        backing_transport = memory.MemoryTransport()
423
 
        server = chroot.ChrootServer(backing_transport)
 
423
        backing_transport = MemoryTransport()
 
424
        server = ChrootServer(backing_transport)
424
425
        self.assertEqual(backing_transport, server.backing_transport)
425
426
 
426
427
    def test_setUp(self):
427
 
        backing_transport = memory.MemoryTransport()
428
 
        server = chroot.ChrootServer(backing_transport)
429
 
        server.start_server()
430
 
        self.addCleanup(server.stop_server)
431
 
        self.assertTrue(server.scheme
432
 
                        in transport._get_protocol_handlers().keys())
 
428
        backing_transport = MemoryTransport()
 
429
        server = ChrootServer(backing_transport)
 
430
        server.setUp()
 
431
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
433
432
 
434
 
    def test_stop_server(self):
435
 
        backing_transport = memory.MemoryTransport()
436
 
        server = chroot.ChrootServer(backing_transport)
437
 
        server.start_server()
438
 
        server.stop_server()
439
 
        self.assertFalse(server.scheme
440
 
                         in transport._get_protocol_handlers().keys())
 
433
    def test_tearDown(self):
 
434
        backing_transport = MemoryTransport()
 
435
        server = ChrootServer(backing_transport)
 
436
        server.setUp()
 
437
        server.tearDown()
 
438
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
441
439
 
442
440
    def test_get_url(self):
443
 
        backing_transport = memory.MemoryTransport()
444
 
        server = chroot.ChrootServer(backing_transport)
445
 
        server.start_server()
446
 
        self.addCleanup(server.stop_server)
 
441
        backing_transport = MemoryTransport()
 
442
        server = ChrootServer(backing_transport)
 
443
        server.setUp()
447
444
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
448
 
 
449
 
 
450
 
class TestHooks(tests.TestCase):
451
 
    """Basic tests for transport hooks"""
452
 
 
453
 
    def _get_connected_transport(self):
454
 
        return transport.ConnectedTransport("bogus:nowhere")
455
 
 
456
 
    def test_transporthooks_initialisation(self):
457
 
        """Check all expected transport hook points are set up"""
458
 
        hookpoint = transport.TransportHooks()
459
 
        self.assertTrue("post_connect" in hookpoint,
460
 
                        "post_connect not in %s" % (hookpoint,))
461
 
 
462
 
    def test_post_connect(self):
463
 
        """Ensure the post_connect hook is called when _set_transport is"""
464
 
        calls = []
465
 
        transport.Transport.hooks.install_named_hook("post_connect",
466
 
                                                     calls.append, None)
467
 
        t = self._get_connected_transport()
468
 
        self.assertLength(0, calls)
469
 
        t._set_connection("connection", "auth")
470
 
        self.assertEqual(calls, [t])
471
 
 
472
 
 
473
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
474
 
    """Pathfilter decoration specific tests."""
475
 
 
476
 
    def test_abspath(self):
477
 
        # The abspath is always relative to the base of the backing transport.
478
 
        server = pathfilter.PathFilteringServer(
479
 
            transport.get_transport_from_url('memory:///foo/bar/'),
480
 
            lambda x: x)
481
 
        server.start_server()
482
 
        t = transport.get_transport_from_url(server.get_url())
483
 
        self.assertEqual(server.get_url(), t.abspath('/'))
484
 
 
485
 
        subdir_t = t.clone('subdir')
486
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
487
 
        server.stop_server()
488
 
 
489
 
    def make_pf_transport(self, filter_func=None):
490
 
        """Make a PathFilteringTransport backed by a MemoryTransport.
491
 
 
492
 
        :param filter_func: by default this will be a no-op function.  Use this
493
 
            parameter to override it."""
494
 
        if filter_func is None:
495
 
            def filter_func(x):
496
 
                return x
497
 
        server = pathfilter.PathFilteringServer(
498
 
            transport.get_transport_from_url('memory:///foo/bar/'),
499
 
            filter_func)
500
 
        server.start_server()
501
 
        self.addCleanup(server.stop_server)
502
 
        return transport.get_transport_from_url(server.get_url())
503
 
 
504
 
    def test__filter(self):
505
 
        # _filter (with an identity func as filter_func) always returns
506
 
        # paths relative to the base of the backing transport.
507
 
        t = self.make_pf_transport()
508
 
        self.assertEqual('foo', t._filter('foo'))
509
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
510
 
        self.assertEqual('', t._filter('..'))
511
 
        self.assertEqual('', t._filter('/'))
512
 
        # The base of the pathfiltering transport is taken into account too.
513
 
        t = t.clone('subdir1/subdir2')
514
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
515
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
516
 
        self.assertEqual('subdir1', t._filter('..'))
517
 
        self.assertEqual('', t._filter('/'))
518
 
 
519
 
    def test_filter_invocation(self):
520
 
        filter_log = []
521
 
 
522
 
        def filter(path):
523
 
            filter_log.append(path)
524
 
            return path
525
 
        t = self.make_pf_transport(filter)
526
 
        t.has('abc')
527
 
        self.assertEqual(['abc'], filter_log)
528
 
        del filter_log[:]
529
 
        t.clone('abc').has('xyz')
530
 
        self.assertEqual(['abc/xyz'], filter_log)
531
 
        del filter_log[:]
532
 
        t.has('/abc')
533
 
        self.assertEqual(['abc'], filter_log)
534
 
 
535
 
    def test_clone(self):
536
 
        t = self.make_pf_transport()
537
 
        # relpath from root and root path are the same
538
 
        relpath_cloned = t.clone('foo')
539
 
        abspath_cloned = t.clone('/foo')
540
 
        self.assertEqual(t.server, relpath_cloned.server)
541
 
        self.assertEqual(t.server, abspath_cloned.server)
542
 
 
543
 
    def test_url_preserves_pathfiltering(self):
544
 
        """Calling get_transport on a pathfiltered transport's base should
545
 
        produce a transport with exactly the same behaviour as the original
546
 
        pathfiltered transport.
547
 
 
548
 
        This is so that it is not possible to escape (accidentally or
549
 
        otherwise) the filtering by doing::
550
 
            url = filtered_transport.base
551
 
            parent_url = urlutils.join(url, '..')
552
 
            new_t = transport.get_transport_from_url(parent_url)
553
 
        """
554
 
        t = self.make_pf_transport()
555
 
        new_t = transport.get_transport_from_url(t.base)
556
 
        self.assertEqual(t.server, new_t.server)
557
 
        self.assertEqual(t.base, new_t.base)
558
 
 
559
 
 
560
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
445
        server.tearDown()
 
446
 
 
447
 
 
448
class ReadonlyDecoratorTransportTest(TestCase):
561
449
    """Readonly decoration specific tests."""
562
450
 
563
451
    def test_local_parameters(self):
 
452
        import bzrlib.transport.readonly as readonly
564
453
        # connect to . in readonly mode
565
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
566
 
        self.assertEqual(True, t.listable())
567
 
        self.assertEqual(True, t.is_readonly())
 
454
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
455
        self.assertEqual(True, transport.listable())
 
456
        self.assertEqual(True, transport.is_readonly())
568
457
 
569
458
    def test_http_parameters(self):
570
 
        from breezy.tests.http_server import HttpServer
 
459
        from bzrlib.tests.http_server import HttpServer
 
460
        import bzrlib.transport.readonly as readonly
571
461
        # connect to '.' via http which is not listable
572
462
        server = HttpServer()
573
 
        self.start_server(server)
574
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
575
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
576
 
        self.assertEqual(False, t.listable())
577
 
        self.assertEqual(True, t.is_readonly())
578
 
 
579
 
 
580
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
463
        server.setUp()
 
464
        try:
 
465
            transport = get_transport('readonly+' + server.get_url())
 
466
            self.failUnless(isinstance(transport,
 
467
                                       readonly.ReadonlyTransportDecorator))
 
468
            self.assertEqual(False, transport.listable())
 
469
            self.assertEqual(True, transport.is_readonly())
 
470
        finally:
 
471
            server.tearDown()
 
472
 
 
473
 
 
474
class FakeNFSDecoratorTests(TestCaseInTempDir):
581
475
    """NFS decorator specific tests."""
582
476
 
583
477
    def get_nfs_transport(self, url):
 
478
        import bzrlib.transport.fakenfs as fakenfs
584
479
        # connect to url with nfs decoration
585
480
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
586
481
 
587
482
    def test_local_parameters(self):
588
483
        # the listable and is_readonly parameters
589
484
        # are not changed by the fakenfs decorator
590
 
        t = self.get_nfs_transport('.')
591
 
        self.assertEqual(True, t.listable())
592
 
        self.assertEqual(False, t.is_readonly())
 
485
        transport = self.get_nfs_transport('.')
 
486
        self.assertEqual(True, transport.listable())
 
487
        self.assertEqual(False, transport.is_readonly())
593
488
 
594
489
    def test_http_parameters(self):
595
490
        # the listable and is_readonly parameters
596
491
        # are not changed by the fakenfs decorator
597
 
        from breezy.tests.http_server import HttpServer
 
492
        from bzrlib.tests.http_server import HttpServer
598
493
        # connect to '.' via http which is not listable
599
494
        server = HttpServer()
600
 
        self.start_server(server)
601
 
        t = self.get_nfs_transport(server.get_url())
602
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
603
 
        self.assertEqual(False, t.listable())
604
 
        self.assertEqual(True, t.is_readonly())
 
495
        server.setUp()
 
496
        try:
 
497
            transport = self.get_nfs_transport(server.get_url())
 
498
            self.assertIsInstance(
 
499
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
500
            self.assertEqual(False, transport.listable())
 
501
            self.assertEqual(True, transport.is_readonly())
 
502
        finally:
 
503
            server.tearDown()
605
504
 
606
505
    def test_fakenfs_server_default(self):
607
506
        # a FakeNFSServer() should bring up a local relpath server for itself
608
 
        server = test_server.FakeNFSServer()
609
 
        self.start_server(server)
610
 
        # the url should be decorated appropriately
611
 
        self.assertStartsWith(server.get_url(), 'fakenfs+')
612
 
        # and we should be able to get a transport for it
613
 
        t = transport.get_transport_from_url(server.get_url())
614
 
        # which must be a FakeNFSTransportDecorator instance.
615
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
507
        import bzrlib.transport.fakenfs as fakenfs
 
508
        server = fakenfs.FakeNFSServer()
 
509
        server.setUp()
 
510
        try:
 
511
            # the url should be decorated appropriately
 
512
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
513
            # and we should be able to get a transport for it
 
514
            transport = get_transport(server.get_url())
 
515
            # which must be a FakeNFSTransportDecorator instance.
 
516
            self.assertIsInstance(
 
517
                transport, fakenfs.FakeNFSTransportDecorator)
 
518
        finally:
 
519
            server.tearDown()
616
520
 
617
521
    def test_fakenfs_rename_semantics(self):
618
522
        # a FakeNFS transport must mangle the way rename errors occur to
619
523
        # look like NFS problems.
620
 
        t = self.get_nfs_transport('.')
 
524
        transport = self.get_nfs_transport('.')
621
525
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
622
 
                        transport=t)
623
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
624
 
 
625
 
 
626
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
526
                        transport=transport)
 
527
        self.assertRaises(errors.ResourceBusy,
 
528
                          transport.rename, 'from', 'to')
 
529
 
 
530
 
 
531
class FakeVFATDecoratorTests(TestCaseInTempDir):
627
532
    """Tests for simulation of VFAT restrictions"""
628
533
 
629
534
    def get_vfat_transport(self, url):
630
535
        """Return vfat-backed transport for test directory"""
631
 
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
 
536
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
632
537
        return FakeVFATTransportDecorator('vfat+' + url)
633
538
 
634
539
    def test_transport_creation(self):
635
 
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
636
 
        t = self.get_vfat_transport('.')
637
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
540
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
541
        transport = self.get_vfat_transport('.')
 
542
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
638
543
 
639
544
    def test_transport_mkdir(self):
640
 
        t = self.get_vfat_transport('.')
641
 
        t.mkdir('HELLO')
642
 
        self.assertTrue(t.has('hello'))
643
 
        self.assertTrue(t.has('Hello'))
 
545
        transport = self.get_vfat_transport('.')
 
546
        transport.mkdir('HELLO')
 
547
        self.assertTrue(transport.has('hello'))
 
548
        self.assertTrue(transport.has('Hello'))
644
549
 
645
550
    def test_forbidden_chars(self):
646
 
        t = self.get_vfat_transport('.')
647
 
        self.assertRaises(ValueError, t.has, "<NU>")
648
 
 
649
 
 
650
 
class BadTransportHandler(transport.Transport):
 
551
        transport = self.get_vfat_transport('.')
 
552
        self.assertRaises(ValueError, transport.has, "<NU>")
 
553
 
 
554
 
 
555
class BadTransportHandler(Transport):
651
556
    def __init__(self, base_url):
652
 
        raise errors.DependencyNotPresent('some_lib',
653
 
                                          'testing missing dependency')
654
 
 
655
 
 
656
 
class BackupTransportHandler(transport.Transport):
 
557
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
558
 
 
559
 
 
560
class BackupTransportHandler(Transport):
657
561
    """Test transport that works as a backup for the BadTransportHandler"""
658
562
    pass
659
563
 
660
564
 
661
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
565
class TestTransportImplementation(TestCaseInTempDir):
662
566
    """Implementation verification for transports.
663
567
 
664
568
    To verify a transport we need a server factory, which is a callable
665
569
    that accepts no parameters and returns an implementation of
666
 
    breezy.transport.Server.
 
570
    bzrlib.transport.Server.
667
571
 
668
572
    That Server is then used to construct transport instances and test
669
573
    the transport via loopback activity.
683
587
    def setUp(self):
684
588
        super(TestTransportImplementation, self).setUp()
685
589
        self._server = self.transport_server()
686
 
        self.start_server(self._server)
 
590
        self._server.setUp()
 
591
        self.addCleanup(self._server.tearDown)
687
592
 
688
593
    def get_transport(self, relpath=None):
689
594
        """Return a connected transport to the local directory.
693
598
        base_url = self._server.get_url()
694
599
        url = self._adjust_url(base_url, relpath)
695
600
        # try getting the transport via the regular interface:
696
 
        t = transport.get_transport_from_url(url)
 
601
        t = get_transport(url)
697
602
        # vila--20070607 if the following are commented out the test suite
698
603
        # still pass. Is this really still needed or was it a forgotten
699
604
        # temporary fix ?
704
609
        return t
705
610
 
706
611
 
707
 
class TestTransportFromPath(tests.TestCaseInTempDir):
708
 
 
709
 
    def test_with_path(self):
710
 
        t = transport.get_transport_from_path(self.test_dir)
711
 
        self.assertIsInstance(t, local.LocalTransport)
712
 
        self.assertEqual(t.base.rstrip("/"),
713
 
                         urlutils.local_path_to_url(self.test_dir))
714
 
 
715
 
    def test_with_url(self):
716
 
        t = transport.get_transport_from_path("file:")
717
 
        self.assertIsInstance(t, local.LocalTransport)
718
 
        self.assertEqual(
719
 
            t.base.rstrip("/"),
720
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
721
 
 
722
 
 
723
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
724
 
 
725
 
    def test_with_path(self):
726
 
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
727
 
                          self.test_dir)
728
 
 
729
 
    def test_with_url(self):
730
 
        url = urlutils.local_path_to_url(self.test_dir)
731
 
        t = transport.get_transport_from_url(url)
732
 
        self.assertIsInstance(t, local.LocalTransport)
733
 
        self.assertEqual(t.base.rstrip("/"), url)
734
 
 
735
 
    def test_with_url_and_segment_parameters(self):
736
 
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
737
 
        t = transport.get_transport_from_url(url)
738
 
        self.assertIsInstance(t, local.LocalTransport)
739
 
        self.assertEqual(t.base.rstrip("/"), url)
740
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
741
 
            f.write("data")
742
 
        self.assertTrue(t.has("afile"))
743
 
 
744
 
 
745
 
class TestLocalTransports(tests.TestCase):
 
612
class TestLocalTransports(TestCase):
746
613
 
747
614
    def test_get_transport_from_abspath(self):
748
615
        here = osutils.abspath('.')
749
 
        t = transport.get_transport(here)
750
 
        self.assertIsInstance(t, local.LocalTransport)
751
 
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
 
616
        t = get_transport(here)
 
617
        self.assertIsInstance(t, LocalTransport)
 
618
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
752
619
 
753
620
    def test_get_transport_from_relpath(self):
754
 
        t = transport.get_transport('.')
755
 
        self.assertIsInstance(t, local.LocalTransport)
756
 
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
 
621
        here = osutils.abspath('.')
 
622
        t = get_transport('.')
 
623
        self.assertIsInstance(t, LocalTransport)
 
624
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
757
625
 
758
626
    def test_get_transport_from_local_url(self):
759
627
        here = osutils.abspath('.')
760
628
        here_url = urlutils.local_path_to_url(here) + '/'
761
 
        t = transport.get_transport(here_url)
762
 
        self.assertIsInstance(t, local.LocalTransport)
763
 
        self.assertEqual(t.base, here_url)
 
629
        t = get_transport(here_url)
 
630
        self.assertIsInstance(t, LocalTransport)
 
631
        self.assertEquals(t.base, here_url)
764
632
 
765
633
    def test_local_abspath(self):
766
634
        here = osutils.abspath('.')
767
 
        t = transport.get_transport(here)
768
 
        self.assertEqual(t.local_abspath(''), here)
769
 
 
770
 
 
771
 
class TestLocalTransportMutation(tests.TestCaseInTempDir):
772
 
 
773
 
    def test_local_transport_mkdir(self):
774
 
        here = osutils.abspath('.')
775
 
        t = transport.get_transport(here)
776
 
        t.mkdir('test')
777
 
        self.assertTrue(os.path.exists('test'))
778
 
 
779
 
    def test_local_transport_mkdir_permission_denied(self):
780
 
        # See https://bugs.launchpad.net/bzr/+bug/606537
781
 
        here = osutils.abspath('.')
782
 
        t = transport.get_transport(here)
783
 
 
784
 
        def fake_chmod(path, mode):
785
 
            e = OSError('permission denied')
786
 
            e.errno = errno.EPERM
787
 
            raise e
788
 
        self.overrideAttr(os, 'chmod', fake_chmod)
789
 
        t.mkdir('test')
790
 
        t.mkdir('test2', mode=0o707)
791
 
        self.assertTrue(os.path.exists('test'))
792
 
        self.assertTrue(os.path.exists('test2'))
793
 
 
794
 
 
795
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
796
 
 
797
 
    def test_local_fdatasync_calls_fdatasync(self):
798
 
        """Check fdatasync on a stream tries to flush the data to the OS.
799
 
 
800
 
        We can't easily observe the external effect but we can at least see
801
 
        it's called.
802
 
        """
803
 
        sentinel = object()
804
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
805
 
        if fdatasync is sentinel:
806
 
            raise tests.TestNotApplicable('fdatasync not supported')
807
 
        t = self.get_transport('.')
808
 
        calls = self.recordCalls(os, 'fdatasync')
809
 
        w = t.open_write_stream('out')
810
 
        w.write(b'foo')
811
 
        w.fdatasync()
812
 
        with open('out', 'rb') as f:
813
 
            # Should have been flushed.
814
 
            self.assertEqual(f.read(), b'foo')
815
 
        self.assertEqual(len(calls), 1, calls)
816
 
 
817
 
    def test_missing_directory(self):
818
 
        t = self.get_transport('.')
819
 
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
820
 
 
821
 
 
822
 
class TestWin32LocalTransport(tests.TestCase):
 
635
        t = get_transport(here)
 
636
        self.assertEquals(t.local_abspath(''), here)
 
637
 
 
638
 
 
639
class TestWin32LocalTransport(TestCase):
823
640
 
824
641
    def test_unc_clone_to_root(self):
825
 
        self.requireFeature(features.win32_feature)
826
642
        # Win32 UNC path like \\HOST\path
827
643
        # clone to root should stop at least at \\HOST part
828
644
        # not on \\
829
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
830
 
        for i in range(4):
 
645
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
646
        for i in xrange(4):
831
647
            t = t.clone('..')
832
 
        self.assertEqual(t.base, 'file://HOST/')
 
648
        self.assertEquals(t.base, 'file://HOST/')
833
649
        # make sure we reach the root
834
650
        t = t.clone('..')
835
 
        self.assertEqual(t.base, 'file://HOST/')
836
 
 
837
 
 
838
 
class TestConnectedTransport(tests.TestCase):
 
651
        self.assertEquals(t.base, 'file://HOST/')
 
652
 
 
653
 
 
654
class TestConnectedTransport(TestCase):
839
655
    """Tests for connected to remote server transports"""
840
656
 
841
657
    def test_parse_url(self):
842
 
        t = transport.ConnectedTransport(
843
 
            'http://simple.example.com/home/source')
844
 
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
845
 
        self.assertEqual(t._parsed_url.port, None)
846
 
        self.assertEqual(t._parsed_url.path, '/home/source/')
847
 
        self.assertTrue(t._parsed_url.user is None)
848
 
        self.assertTrue(t._parsed_url.password is None)
 
658
        t = ConnectedTransport('http://simple.example.com/home/source')
 
659
        self.assertEquals(t._host, 'simple.example.com')
 
660
        self.assertEquals(t._port, None)
 
661
        self.assertEquals(t._path, '/home/source/')
 
662
        self.failUnless(t._user is None)
 
663
        self.failUnless(t._password is None)
849
664
 
850
 
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
 
665
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
851
666
 
852
667
    def test_parse_url_with_at_in_user(self):
853
668
        # Bug 228058
854
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
855
 
        self.assertEqual(t._parsed_url.user, 'user@host.com')
 
669
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
670
        self.assertEquals(t._user, 'user@host.com')
856
671
 
857
672
    def test_parse_quoted_url(self):
858
 
        t = transport.ConnectedTransport(
859
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
860
 
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
861
 
        self.assertEqual(t._parsed_url.port, 2222)
862
 
        self.assertEqual(t._parsed_url.user, 'robey')
863
 
        self.assertEqual(t._parsed_url.password, 'h@t')
864
 
        self.assertEqual(t._parsed_url.path, '/path/')
 
673
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
674
        self.assertEquals(t._host, 'exAmple.com')
 
675
        self.assertEquals(t._port, 2222)
 
676
        self.assertEquals(t._user, 'robey')
 
677
        self.assertEquals(t._password, 'h@t')
 
678
        self.assertEquals(t._path, '/path/')
865
679
 
866
680
        # Base should not keep track of the password
867
 
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
681
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
868
682
 
869
683
    def test_parse_invalid_url(self):
870
 
        self.assertRaises(urlutils.InvalidURL,
871
 
                          transport.ConnectedTransport,
 
684
        self.assertRaises(errors.InvalidURL,
 
685
                          ConnectedTransport,
872
686
                          'sftp://lily.org:~janneke/public/bzr/gub')
873
687
 
874
688
    def test_relpath(self):
875
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
689
        t = ConnectedTransport('sftp://user@host.com/abs/path')
876
690
 
877
 
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
878
 
                         'sub')
 
691
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
879
692
        self.assertRaises(errors.PathNotChild, t.relpath,
880
693
                          'http://user@host.com/abs/path/sub')
881
694
        self.assertRaises(errors.PathNotChild, t.relpath,
885
698
        self.assertRaises(errors.PathNotChild, t.relpath,
886
699
                          'sftp://user@host.com:33/abs/path/sub')
887
700
        # Make sure it works when we don't supply a username
888
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
889
 
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
701
        t = ConnectedTransport('sftp://host.com/abs/path')
 
702
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
890
703
 
891
704
        # Make sure it works when parts of the path will be url encoded
892
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
893
 
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
705
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
706
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
894
707
 
895
708
    def test_connection_sharing_propagate_credentials(self):
896
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
897
 
        self.assertEqual('user', t._parsed_url.user)
898
 
        self.assertEqual('host.com', t._parsed_url.host)
 
709
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
710
        self.assertEquals('user', t._user)
 
711
        self.assertEquals('host.com', t._host)
899
712
        self.assertIs(None, t._get_connection())
900
 
        self.assertIs(None, t._parsed_url.password)
 
713
        self.assertIs(None, t._password)
901
714
        c = t.clone('subdir')
902
715
        self.assertIs(None, c._get_connection())
903
 
        self.assertIs(None, t._parsed_url.password)
 
716
        self.assertIs(None, t._password)
904
717
 
905
718
        # Simulate the user entering a password
906
719
        password = 'secret'
920
733
        self.assertIs(new_password, c._get_credentials())
921
734
 
922
735
 
923
 
class TestReusedTransports(tests.TestCase):
 
736
class TestReusedTransports(TestCase):
924
737
    """Tests for transport reuse"""
925
738
 
926
739
    def test_reuse_same_transport(self):
927
740
        possible_transports = []
928
 
        t1 = transport.get_transport_from_url(
929
 
            'http://foo/', possible_transports=possible_transports)
 
741
        t1 = get_transport('http://foo/',
 
742
                           possible_transports=possible_transports)
930
743
        self.assertEqual([t1], possible_transports)
931
 
        t2 = transport.get_transport_from_url('http://foo/',
932
 
                                              possible_transports=[t1])
 
744
        t2 = get_transport('http://foo/', possible_transports=[t1])
933
745
        self.assertIs(t1, t2)
934
746
 
935
747
        # Also check that final '/' are handled correctly
936
 
        t3 = transport.get_transport_from_url('http://foo/path/')
937
 
        t4 = transport.get_transport_from_url('http://foo/path',
938
 
                                              possible_transports=[t3])
 
748
        t3 = get_transport('http://foo/path/')
 
749
        t4 = get_transport('http://foo/path', possible_transports=[t3])
939
750
        self.assertIs(t3, t4)
940
751
 
941
 
        t5 = transport.get_transport_from_url('http://foo/path')
942
 
        t6 = transport.get_transport_from_url('http://foo/path/',
943
 
                                              possible_transports=[t5])
 
752
        t5 = get_transport('http://foo/path')
 
753
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
944
754
        self.assertIs(t5, t6)
945
755
 
946
756
    def test_don_t_reuse_different_transport(self):
947
 
        t1 = transport.get_transport_from_url('http://foo/path')
948
 
        t2 = transport.get_transport_from_url('http://bar/path',
949
 
                                              possible_transports=[t1])
 
757
        t1 = get_transport('http://foo/path')
 
758
        t2 = get_transport('http://bar/path', possible_transports=[t1])
950
759
        self.assertIsNot(t1, t2)
951
760
 
952
761
 
953
 
class TestTransportTrace(tests.TestCase):
 
762
class TestTransportTrace(TestCase):
954
763
 
955
 
    def test_decorator(self):
956
 
        t = transport.get_transport_from_url('trace+memory://')
 
764
    def test_get(self):
 
765
        transport = get_transport('trace+memory://')
957
766
        self.assertIsInstance(
958
 
            t, breezy.transport.trace.TransportTraceDecorator)
 
767
            transport, bzrlib.transport.trace.TransportTraceDecorator)
959
768
 
960
769
    def test_clone_preserves_activity(self):
961
 
        t = transport.get_transport_from_url('trace+memory://')
962
 
        t2 = t.clone('.')
963
 
        self.assertTrue(t is not t2)
964
 
        self.assertTrue(t._activity is t2._activity)
 
770
        transport = get_transport('trace+memory://')
 
771
        transport2 = transport.clone('.')
 
772
        self.assertTrue(transport is not transport2)
 
773
        self.assertTrue(transport._activity is transport2._activity)
965
774
 
966
775
    # the following specific tests are for the operations that have made use of
967
776
    # logging in tests; we could test every single operation but doing that
968
777
    # still won't cause a test failure when the top level Transport API
969
778
    # changes; so there is little return doing that.
970
779
    def test_get(self):
971
 
        t = transport.get_transport_from_url('trace+memory:///')
972
 
        t.put_bytes('foo', b'barish')
973
 
        t.get('foo')
 
780
        transport = get_transport('trace+memory:///')
 
781
        transport.put_bytes('foo', 'barish')
 
782
        transport.get('foo')
974
783
        expected_result = []
975
784
        # put_bytes records the bytes, not the content to avoid memory
976
785
        # pressure.
977
786
        expected_result.append(('put_bytes', 'foo', 6, None))
978
787
        # get records the file name only.
979
788
        expected_result.append(('get', 'foo'))
980
 
        self.assertEqual(expected_result, t._activity)
 
789
        self.assertEqual(expected_result, transport._activity)
981
790
 
982
791
    def test_readv(self):
983
 
        t = transport.get_transport_from_url('trace+memory:///')
984
 
        t.put_bytes('foo', b'barish')
985
 
        list(t.readv('foo', [(0, 1), (3, 2)],
986
 
                     adjust_for_latency=True, upper_limit=6))
 
792
        transport = get_transport('trace+memory:///')
 
793
        transport.put_bytes('foo', 'barish')
 
794
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
795
            upper_limit=6))
987
796
        expected_result = []
988
797
        # put_bytes records the bytes, not the content to avoid memory
989
798
        # pressure.
990
799
        expected_result.append(('put_bytes', 'foo', 6, None))
991
800
        # readv records the supplied offset request
992
801
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
993
 
        self.assertEqual(expected_result, t._activity)
994
 
 
995
 
 
996
 
class TestSSHConnections(tests.TestCaseWithTransport):
997
 
 
998
 
    def test_bzr_connect_to_bzr_ssh(self):
999
 
        """get_transport of a bzr+ssh:// behaves correctly.
1000
 
 
1001
 
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
1002
 
        """
1003
 
        # This test actually causes a bzr instance to be invoked, which is very
1004
 
        # expensive: it should be the only such test in the test suite.
1005
 
        # A reasonable evolution for this would be to simply check inside
1006
 
        # check_channel_exec_request that the command is appropriate, and then
1007
 
        # satisfy requests in-process.
1008
 
        self.requireFeature(features.paramiko)
1009
 
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
1010
 
        # override the interface (doesn't change self._vendor).
1011
 
        # Note that this does encryption, so can be slow.
1012
 
        from breezy.tests import stub_sftp
1013
 
 
1014
 
        # Start an SSH server
1015
 
        self.command_executed = []
1016
 
        # XXX: This is horrible -- we define a really dumb SSH server that
1017
 
        # executes commands, and manage the hooking up of stdin/out/err to the
1018
 
        # SSH channel ourselves.  Surely this has already been implemented
1019
 
        # elsewhere?
1020
 
        started = []
1021
 
 
1022
 
        class StubSSHServer(stub_sftp.StubServer):
1023
 
 
1024
 
            test = self
1025
 
 
1026
 
            def check_channel_exec_request(self, channel, command):
1027
 
                self.test.command_executed.append(command)
1028
 
                proc = subprocess.Popen(
1029
 
                    command, shell=True, stdin=subprocess.PIPE,
1030
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1031
 
                    bufsize=0)
1032
 
 
1033
 
                # XXX: horribly inefficient, not to mention ugly.
1034
 
                # Start a thread for each of stdin/out/err, and relay bytes
1035
 
                # from the subprocess to channel and vice versa.
1036
 
                def ferry_bytes(read, write, close):
1037
 
                    while True:
1038
 
                        bytes = read(1)
1039
 
                        if bytes == b'':
1040
 
                            close()
1041
 
                            break
1042
 
                        write(bytes)
1043
 
 
1044
 
                file_functions = [
1045
 
                    (channel.recv, proc.stdin.write, proc.stdin.close),
1046
 
                    (proc.stdout.read, channel.sendall, channel.close),
1047
 
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
1048
 
                started.append(proc)
1049
 
                for read, write, close in file_functions:
1050
 
                    t = threading.Thread(
1051
 
                        target=ferry_bytes, args=(read, write, close))
1052
 
                    t.start()
1053
 
                    started.append(t)
1054
 
 
1055
 
                return True
1056
 
 
1057
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1058
 
        # We *don't* want to override the default SSH vendor: the detected one
1059
 
        # is the one to use.
1060
 
 
1061
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1062
 
        # inherits from SFTPServer which forces the SSH vendor to
1063
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1064
 
        self.start_server(ssh_server)
1065
 
        port = ssh_server.port
1066
 
 
1067
 
        if sys.platform == 'win32':
1068
 
            bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
1069
 
        else:
1070
 
            bzr_remote_path = self.get_brz_path()
1071
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1072
 
 
1073
 
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1074
 
        # variable is used to tell bzr what command to run on the remote end.
1075
 
        path_to_branch = osutils.abspath('.')
1076
 
        if sys.platform == 'win32':
1077
 
            # On Windows, we export all drives as '/C:/, etc. So we need to
1078
 
            # prefix a '/' to get the right path.
1079
 
            path_to_branch = '/' + path_to_branch
1080
 
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1081
 
        t = transport.get_transport(url)
1082
 
        self.permit_url(t.base)
1083
 
        t.mkdir('foo')
1084
 
 
1085
 
        self.assertEqual(
1086
 
            [b'%s serve --inet --directory=/ --allow-writes' %
1087
 
                bzr_remote_path.encode()],
1088
 
            self.command_executed)
1089
 
        # Make sure to disconnect, so that the remote process can stop, and we
1090
 
        # can cleanup. Then pause the test until everything is shutdown
1091
 
        t._client._medium.disconnect()
1092
 
        if not started:
1093
 
            return
1094
 
        # First wait for the subprocess
1095
 
        started[0].wait()
1096
 
        # And the rest are threads
1097
 
        for t in started[1:]:
1098
 
            t.join()
1099
 
 
1100
 
 
1101
 
class TestUnhtml(tests.TestCase):
1102
 
 
1103
 
    """Tests for unhtml_roughly"""
1104
 
 
1105
 
    def test_truncation(self):
1106
 
        fake_html = "<p>something!\n" * 1000
1107
 
        result = http.unhtml_roughly(fake_html)
1108
 
        self.assertEqual(len(result), 1000)
1109
 
        self.assertStartsWith(result, " something!")
 
802
        self.assertEqual(expected_result, transport._activity)