/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Marius Kruger
  • Date: 2010-07-10 21:28:56 UTC
  • mto: (5384.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5385.
  • Revision ID: marius.kruger@enerweb.co.za-20100710212856-uq4ji3go0u5se7hx
* Update documentation
* add NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
 
import errno
 
18
from cStringIO import StringIO
19
19
import os
20
20
import subprocess
21
21
import sys
22
22
import threading
23
23
 
24
 
from .. import (
 
24
from bzrlib import (
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
28
    transport,
29
29
    urlutils,
30
30
    )
31
 
from ..sixish import (
32
 
    BytesIO,
33
 
    )
34
 
from ..transport import (
 
31
from bzrlib.transport import (
35
32
    chroot,
36
33
    fakenfs,
37
 
    http,
38
34
    local,
39
35
    memory,
40
36
    pathfilter,
41
37
    readonly,
42
38
    )
43
 
import breezy.transport.trace
44
 
from . import (
 
39
from bzrlib.tests import (
45
40
    features,
46
41
    test_server,
47
42
    )
53
48
class TestTransport(tests.TestCase):
54
49
    """Test the non transport-concrete class functionality."""
55
50
 
 
51
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
52
    # of try/finally -- vila 20100205
 
53
 
56
54
    def test__get_set_protocol_handlers(self):
57
55
        handlers = transport._get_protocol_handlers()
58
 
        self.assertNotEqual([], handlers.keys())
59
 
        transport._clear_protocol_handlers()
60
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
61
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
56
        self.assertNotEqual([], handlers.keys( ))
 
57
        try:
 
58
            transport._clear_protocol_handlers()
 
59
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
60
        finally:
 
61
            transport._set_protocol_handlers(handlers)
62
62
 
63
63
    def test_get_transport_modules(self):
64
64
        handlers = transport._get_protocol_handlers()
65
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
66
65
        # don't pollute the current handlers
67
66
        transport._clear_protocol_handlers()
68
 
 
69
67
        class SampleHandler(object):
70
68
            """I exist, isnt that enough?"""
71
 
        transport._clear_protocol_handlers()
72
 
        transport.register_transport_proto('foo')
73
 
        transport.register_lazy_transport('foo',
74
 
                                          'breezy.tests.test_transport',
75
 
                                          'TestTransport.SampleHandler')
76
 
        transport.register_transport_proto('bar')
77
 
        transport.register_lazy_transport('bar',
78
 
                                          'breezy.tests.test_transport',
79
 
                                          'TestTransport.SampleHandler')
80
 
        self.assertEqual([SampleHandler.__module__,
81
 
                          'breezy.transport.chroot',
82
 
                          'breezy.transport.pathfilter'],
83
 
                         transport._get_transport_modules())
 
69
        try:
 
70
            transport._clear_protocol_handlers()
 
71
            transport.register_transport_proto('foo')
 
72
            transport.register_lazy_transport('foo',
 
73
                                              'bzrlib.tests.test_transport',
 
74
                                              'TestTransport.SampleHandler')
 
75
            transport.register_transport_proto('bar')
 
76
            transport.register_lazy_transport('bar',
 
77
                                              'bzrlib.tests.test_transport',
 
78
                                              'TestTransport.SampleHandler')
 
79
            self.assertEqual([SampleHandler.__module__,
 
80
                              'bzrlib.transport.chroot',
 
81
                              'bzrlib.transport.pathfilter'],
 
82
                             transport._get_transport_modules())
 
83
        finally:
 
84
            transport._set_protocol_handlers(handlers)
84
85
 
85
86
    def test_transport_dependency(self):
86
87
        """Transport with missing dependency causes no error"""
87
88
        saved_handlers = transport._get_protocol_handlers()
88
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
89
        # don't pollute the current handlers
90
90
        transport._clear_protocol_handlers()
91
 
        transport.register_transport_proto('foo')
92
 
        transport.register_lazy_transport(
93
 
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
94
91
        try:
95
 
            transport.get_transport_from_url('foo://fooserver/foo')
96
 
        except errors.UnsupportedProtocol as e:
97
 
            self.assertEqual('Unsupported protocol'
98
 
                             ' for url "foo://fooserver/foo":'
99
 
                             ' Unable to import library "some_lib":'
100
 
                             ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
 
92
            transport.register_transport_proto('foo')
 
93
            transport.register_lazy_transport(
 
94
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
95
            try:
 
96
                transport.get_transport('foo://fooserver/foo')
 
97
            except errors.UnsupportedProtocol, e:
 
98
                e_str = str(e)
 
99
                self.assertEquals('Unsupported protocol'
 
100
                                  ' for url "foo://fooserver/foo":'
 
101
                                  ' Unable to import library "some_lib":'
 
102
                                  ' testing missing dependency', str(e))
 
103
            else:
 
104
                self.fail('Did not raise UnsupportedProtocol')
 
105
        finally:
 
106
            # restore original values
 
107
            transport._set_protocol_handlers(saved_handlers)
103
108
 
104
109
    def test_transport_fallback(self):
105
110
        """Transport with missing dependency causes no error"""
106
111
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
112
        try:
 
113
            transport._clear_protocol_handlers()
 
114
            transport.register_transport_proto('foo')
 
115
            transport.register_lazy_transport(
 
116
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
117
            transport.register_lazy_transport(
 
118
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
119
            t = transport.get_transport('foo://fooserver/foo')
 
120
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
121
        finally:
 
122
            transport._set_protocol_handlers(saved_handlers)
116
123
 
117
124
    def test_ssh_hints(self):
118
125
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
126
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
121
 
        except errors.UnsupportedProtocol as e:
122
 
            self.assertEqual(
123
 
                'Unsupported protocol'
124
 
                ' for url "ssh://fooserver/foo":'
125
 
                ' Use bzr+ssh for Bazaar operations over SSH, '
126
 
                'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
127
 
                'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
128
 
                str(e))
 
127
            transport.get_transport('ssh://fooserver/foo')
 
128
        except errors.UnsupportedProtocol, e:
 
129
            e_str = str(e)
 
130
            self.assertEquals('Unsupported protocol'
 
131
                              ' for url "ssh://fooserver/foo":'
 
132
                              ' bzr supports bzr+ssh to operate over ssh,'
 
133
                              ' use "bzr+ssh://fooserver/foo".',
 
134
                              str(e))
129
135
        else:
130
136
            self.fail('Did not raise UnsupportedProtocol')
131
137
 
134
140
        a_file = transport.LateReadError('a path')
135
141
        try:
136
142
            a_file.read()
137
 
        except errors.ReadError as error:
 
143
        except errors.ReadError, error:
138
144
            self.assertEqual('a path', error.path)
139
145
        self.assertRaises(errors.ReadError, a_file.read, 40)
140
146
        a_file.close()
141
147
 
 
148
    def test__combine_paths(self):
 
149
        t = transport.Transport('/')
 
150
        self.assertEqual('/home/sarah/project/foo',
 
151
                         t._combine_paths('/home/sarah', 'project/foo'))
 
152
        self.assertEqual('/etc',
 
153
                         t._combine_paths('/home/sarah', '../../etc'))
 
154
        self.assertEqual('/etc',
 
155
                         t._combine_paths('/home/sarah', '../../../etc'))
 
156
        self.assertEqual('/etc',
 
157
                         t._combine_paths('/home/sarah', '/etc'))
 
158
 
142
159
    def test_local_abspath_non_local_transport(self):
143
160
        # the base implementation should throw
144
161
        t = memory.MemoryTransport()
164
181
    def test_coalesce_unrelated(self):
165
182
        self.check([(0, 10, [(0, 10)]),
166
183
                    (20, 10, [(0, 10)]),
167
 
                    ], [(0, 10), (20, 10)])
 
184
                   ], [(0, 10), (20, 10)])
168
185
 
169
186
    def test_coalesce_unsorted(self):
170
187
        self.check([(20, 10, [(0, 10)]),
171
188
                    (0, 10, [(0, 10)]),
172
 
                    ], [(20, 10), (0, 10)])
 
189
                   ], [(20, 10), (0, 10)])
173
190
 
174
191
    def test_coalesce_nearby(self):
175
192
        self.check([(0, 20, [(0, 10), (10, 10)])],
177
194
 
178
195
    def test_coalesce_overlapped(self):
179
196
        self.assertRaises(ValueError,
180
 
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
181
 
                          [(0, 10), (5, 10)])
 
197
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
198
                        [(0, 10), (5, 10)])
182
199
 
183
200
    def test_coalesce_limit(self):
184
201
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
185
202
                              (30, 10), (40, 10)]),
186
203
                    (60, 50, [(0, 10), (10, 10), (20, 10),
187
204
                              (30, 10), (40, 10)]),
188
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
189
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
190
 
                        (90, 10), (100, 10)],
191
 
                   limit=5)
 
205
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
206
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
207
                       (90, 10), (100, 10)],
 
208
                    limit=5)
192
209
 
193
210
    def test_coalesce_no_limit(self):
194
211
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
195
212
                               (30, 10), (40, 10), (50, 10),
196
213
                               (60, 10), (70, 10), (80, 10),
197
214
                               (90, 10)]),
198
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
199
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
200
 
                        (90, 10), (100, 10)])
 
215
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
216
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
217
                       (90, 10), (100, 10)])
201
218
 
202
219
    def test_coalesce_fudge(self):
203
220
        self.check([(10, 30, [(0, 10), (20, 10)]),
204
 
                    (100, 10, [(0, 10)]),
205
 
                    ], [(10, 10), (30, 10), (100, 10)],
206
 
                   fudge=10)
207
 
 
 
221
                    (100, 10, [(0, 10),]),
 
222
                   ], [(10, 10), (30, 10), (100, 10)],
 
223
                   fudge=10
 
224
                  )
208
225
    def test_coalesce_max_size(self):
209
226
        self.check([(10, 20, [(0, 10), (10, 10)]),
210
227
                    (30, 50, [(0, 50)]),
211
228
                    # If one range is above max_size, it gets its own coalesced
212
229
                    # offset
213
 
                    (100, 80, [(0, 80)]), ],
 
230
                    (100, 80, [(0, 80),]),],
214
231
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
215
 
                   max_size=50)
 
232
                   max_size=50
 
233
                  )
216
234
 
217
235
    def test_coalesce_no_max_size(self):
218
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
236
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
219
237
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
220
 
                   )
 
238
                  )
221
239
 
222
240
    def test_coalesce_default_limit(self):
223
241
        # By default we use a 100MB max size.
224
 
        ten_mb = 10 * 1024 * 1024
225
 
        self.check(
226
 
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
227
 
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
228
 
            [(i * ten_mb, ten_mb) for i in range(11)])
229
 
        self.check(
230
 
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
 
            [(i * ten_mb, ten_mb) for i in range(11)],
232
 
            max_size=1 * 1024 * 1024 * 1024)
 
242
        ten_mb = 10*1024*1024
 
243
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
244
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
245
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
246
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
247
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
248
                   max_size=1*1024*1024*1024)
233
249
 
234
250
 
235
251
class TestMemoryServer(tests.TestCase):
239
255
        server.start_server()
240
256
        url = server.get_url()
241
257
        self.assertTrue(url in transport.transport_list_registry)
242
 
        t = transport.get_transport_from_url(url)
 
258
        t = transport.get_transport(url)
243
259
        del t
244
260
        server.stop_server()
245
261
        self.assertFalse(url in transport.transport_list_registry)
272
288
 
273
289
    def test_append_and_get(self):
274
290
        t = memory.MemoryTransport()
275
 
        t.append_bytes('path', b'content')
276
 
        self.assertEqual(t.get('path').read(), b'content')
277
 
        t.append_file('path', BytesIO(b'content'))
278
 
        with t.get('path') as f:
279
 
            self.assertEqual(f.read(), b'contentcontent')
 
291
        t.append_bytes('path', 'content')
 
292
        self.assertEqual(t.get('path').read(), 'content')
 
293
        t.append_file('path', StringIO('content'))
 
294
        self.assertEqual(t.get('path').read(), 'contentcontent')
280
295
 
281
296
    def test_put_and_get(self):
282
297
        t = memory.MemoryTransport()
283
 
        t.put_file('path', BytesIO(b'content'))
284
 
        self.assertEqual(t.get('path').read(), b'content')
285
 
        t.put_bytes('path', b'content')
286
 
        self.assertEqual(t.get('path').read(), b'content')
 
298
        t.put_file('path', StringIO('content'))
 
299
        self.assertEqual(t.get('path').read(), 'content')
 
300
        t.put_bytes('path', 'content')
 
301
        self.assertEqual(t.get('path').read(), 'content')
287
302
 
288
303
    def test_append_without_dir_fails(self):
289
304
        t = memory.MemoryTransport()
290
305
        self.assertRaises(errors.NoSuchFile,
291
 
                          t.append_bytes, 'dir/path', b'content')
 
306
                          t.append_bytes, 'dir/path', 'content')
292
307
 
293
308
    def test_put_without_dir_fails(self):
294
309
        t = memory.MemoryTransport()
295
310
        self.assertRaises(errors.NoSuchFile,
296
 
                          t.put_file, 'dir/path', BytesIO(b'content'))
 
311
                          t.put_file, 'dir/path', StringIO('content'))
297
312
 
298
313
    def test_get_missing(self):
299
314
        transport = memory.MemoryTransport()
301
316
 
302
317
    def test_has_missing(self):
303
318
        t = memory.MemoryTransport()
304
 
        self.assertEqual(False, t.has('foo'))
 
319
        self.assertEquals(False, t.has('foo'))
305
320
 
306
321
    def test_has_present(self):
307
322
        t = memory.MemoryTransport()
308
 
        t.append_bytes('foo', b'content')
309
 
        self.assertEqual(True, t.has('foo'))
 
323
        t.append_bytes('foo', 'content')
 
324
        self.assertEquals(True, t.has('foo'))
310
325
 
311
326
    def test_list_dir(self):
312
327
        t = memory.MemoryTransport()
313
 
        t.put_bytes('foo', b'content')
 
328
        t.put_bytes('foo', 'content')
314
329
        t.mkdir('dir')
315
 
        t.put_bytes('dir/subfoo', b'content')
316
 
        t.put_bytes('dirlike', b'content')
 
330
        t.put_bytes('dir/subfoo', 'content')
 
331
        t.put_bytes('dirlike', 'content')
317
332
 
318
 
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
319
 
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
 
333
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
334
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
320
335
 
321
336
    def test_mkdir(self):
322
337
        t = memory.MemoryTransport()
323
338
        t.mkdir('dir')
324
 
        t.append_bytes('dir/path', b'content')
325
 
        with t.get('dir/path') as f:
326
 
            self.assertEqual(f.read(), b'content')
 
339
        t.append_bytes('dir/path', 'content')
 
340
        self.assertEqual(t.get('dir/path').read(), 'content')
327
341
 
328
342
    def test_mkdir_missing_parent(self):
329
343
        t = memory.MemoryTransport()
342
356
    def test_iter_files_recursive(self):
343
357
        t = memory.MemoryTransport()
344
358
        t.mkdir('dir')
345
 
        t.put_bytes('dir/foo', b'content')
346
 
        t.put_bytes('dir/bar', b'content')
347
 
        t.put_bytes('bar', b'content')
 
359
        t.put_bytes('dir/foo', 'content')
 
360
        t.put_bytes('dir/bar', 'content')
 
361
        t.put_bytes('bar', 'content')
348
362
        paths = set(t.iter_files_recursive())
349
 
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
 
363
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
350
364
 
351
365
    def test_stat(self):
352
366
        t = memory.MemoryTransport()
353
 
        t.put_bytes('foo', b'content')
354
 
        t.put_bytes('bar', b'phowar')
 
367
        t.put_bytes('foo', 'content')
 
368
        t.put_bytes('bar', 'phowar')
355
369
        self.assertEqual(7, t.stat('foo').st_size)
356
370
        self.assertEqual(6, t.stat('bar').st_size)
357
371
 
362
376
    def test_abspath(self):
363
377
        # The abspath is always relative to the chroot_url.
364
378
        server = chroot.ChrootServer(
365
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
379
            transport.get_transport('memory:///foo/bar/'))
366
380
        self.start_server(server)
367
 
        t = transport.get_transport_from_url(server.get_url())
 
381
        t = transport.get_transport(server.get_url())
368
382
        self.assertEqual(server.get_url(), t.abspath('/'))
369
383
 
370
384
        subdir_t = t.clone('subdir')
372
386
 
373
387
    def test_clone(self):
374
388
        server = chroot.ChrootServer(
375
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
389
            transport.get_transport('memory:///foo/bar/'))
376
390
        self.start_server(server)
377
 
        t = transport.get_transport_from_url(server.get_url())
 
391
        t = transport.get_transport(server.get_url())
378
392
        # relpath from root and root path are the same
379
393
        relpath_cloned = t.clone('foo')
380
394
        abspath_cloned = t.clone('/foo')
389
403
        This is so that it is not possible to escape a chroot by doing::
390
404
            url = chroot_transport.base
391
405
            parent_url = urlutils.join(url, '..')
392
 
            new_t = transport.get_transport_from_url(parent_url)
 
406
            new_t = transport.get_transport(parent_url)
393
407
        """
394
408
        server = chroot.ChrootServer(
395
 
            transport.get_transport_from_url('memory:///path/subpath'))
 
409
            transport.get_transport('memory:///path/subpath'))
396
410
        self.start_server(server)
397
 
        t = transport.get_transport_from_url(server.get_url())
398
 
        new_t = transport.get_transport_from_url(t.base)
 
411
        t = transport.get_transport(server.get_url())
 
412
        new_t = transport.get_transport(t.base)
399
413
        self.assertEqual(t.server, new_t.server)
400
414
        self.assertEqual(t.base, new_t.base)
401
415
 
406
420
        This is so that it is not possible to escape a chroot by doing::
407
421
            url = chroot_transport.base
408
422
            parent_url = urlutils.join(url, '..')
409
 
            new_t = transport.get_transport_from_url(parent_url)
 
423
            new_t = transport.get_transport(parent_url)
410
424
        """
411
 
        server = chroot.ChrootServer(
412
 
            transport.get_transport_from_url('memory:///path/'))
 
425
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
413
426
        self.start_server(server)
414
 
        t = transport.get_transport_from_url(server.get_url())
 
427
        t = transport.get_transport(server.get_url())
415
428
        self.assertRaises(
416
 
            urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
 
429
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
417
430
 
418
431
 
419
432
class TestChrootServer(tests.TestCase):
427
440
        backing_transport = memory.MemoryTransport()
428
441
        server = chroot.ChrootServer(backing_transport)
429
442
        server.start_server()
430
 
        self.addCleanup(server.stop_server)
431
 
        self.assertTrue(server.scheme
432
 
                        in transport._get_protocol_handlers().keys())
 
443
        try:
 
444
            self.assertTrue(server.scheme
 
445
                            in transport._get_protocol_handlers().keys())
 
446
        finally:
 
447
            server.stop_server()
433
448
 
434
449
    def test_stop_server(self):
435
450
        backing_transport = memory.MemoryTransport()
443
458
        backing_transport = memory.MemoryTransport()
444
459
        server = chroot.ChrootServer(backing_transport)
445
460
        server.start_server()
446
 
        self.addCleanup(server.stop_server)
447
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
448
 
 
449
 
 
450
 
class TestHooks(tests.TestCase):
451
 
    """Basic tests for transport hooks"""
452
 
 
453
 
    def _get_connected_transport(self):
454
 
        return transport.ConnectedTransport("bogus:nowhere")
455
 
 
456
 
    def test_transporthooks_initialisation(self):
457
 
        """Check all expected transport hook points are set up"""
458
 
        hookpoint = transport.TransportHooks()
459
 
        self.assertTrue("post_connect" in hookpoint,
460
 
                        "post_connect not in %s" % (hookpoint,))
461
 
 
462
 
    def test_post_connect(self):
463
 
        """Ensure the post_connect hook is called when _set_transport is"""
464
 
        calls = []
465
 
        transport.Transport.hooks.install_named_hook("post_connect",
466
 
                                                     calls.append, None)
467
 
        t = self._get_connected_transport()
468
 
        self.assertLength(0, calls)
469
 
        t._set_connection("connection", "auth")
470
 
        self.assertEqual(calls, [t])
 
461
        try:
 
462
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
463
        finally:
 
464
            server.stop_server()
471
465
 
472
466
 
473
467
class PathFilteringDecoratorTransportTest(tests.TestCase):
476
470
    def test_abspath(self):
477
471
        # The abspath is always relative to the base of the backing transport.
478
472
        server = pathfilter.PathFilteringServer(
479
 
            transport.get_transport_from_url('memory:///foo/bar/'),
 
473
            transport.get_transport('memory:///foo/bar/'),
480
474
            lambda x: x)
481
475
        server.start_server()
482
 
        t = transport.get_transport_from_url(server.get_url())
 
476
        t = transport.get_transport(server.get_url())
483
477
        self.assertEqual(server.get_url(), t.abspath('/'))
484
478
 
485
479
        subdir_t = t.clone('subdir')
488
482
 
489
483
    def make_pf_transport(self, filter_func=None):
490
484
        """Make a PathFilteringTransport backed by a MemoryTransport.
491
 
 
 
485
        
492
486
        :param filter_func: by default this will be a no-op function.  Use this
493
487
            parameter to override it."""
494
488
        if filter_func is None:
495
 
            def filter_func(x):
496
 
                return x
 
489
            filter_func = lambda x: x
497
490
        server = pathfilter.PathFilteringServer(
498
 
            transport.get_transport_from_url('memory:///foo/bar/'),
499
 
            filter_func)
 
491
            transport.get_transport('memory:///foo/bar/'), filter_func)
500
492
        server.start_server()
501
493
        self.addCleanup(server.stop_server)
502
 
        return transport.get_transport_from_url(server.get_url())
 
494
        return transport.get_transport(server.get_url())
503
495
 
504
496
    def test__filter(self):
505
497
        # _filter (with an identity func as filter_func) always returns
518
510
 
519
511
    def test_filter_invocation(self):
520
512
        filter_log = []
521
 
 
522
513
        def filter(path):
523
514
            filter_log.append(path)
524
515
            return path
549
540
        otherwise) the filtering by doing::
550
541
            url = filtered_transport.base
551
542
            parent_url = urlutils.join(url, '..')
552
 
            new_t = transport.get_transport_from_url(parent_url)
 
543
            new_t = transport.get_transport(parent_url)
553
544
        """
554
545
        t = self.make_pf_transport()
555
 
        new_t = transport.get_transport_from_url(t.base)
 
546
        new_t = transport.get_transport(t.base)
556
547
        self.assertEqual(t.server, new_t.server)
557
548
        self.assertEqual(t.base, new_t.base)
558
549
 
567
558
        self.assertEqual(True, t.is_readonly())
568
559
 
569
560
    def test_http_parameters(self):
570
 
        from breezy.tests.http_server import HttpServer
 
561
        from bzrlib.tests.http_server import HttpServer
571
562
        # connect to '.' via http which is not listable
572
563
        server = HttpServer()
573
564
        self.start_server(server)
574
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
575
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
565
        t = transport.get_transport('readonly+' + server.get_url())
 
566
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
576
567
        self.assertEqual(False, t.listable())
577
568
        self.assertEqual(True, t.is_readonly())
578
569
 
594
585
    def test_http_parameters(self):
595
586
        # the listable and is_readonly parameters
596
587
        # are not changed by the fakenfs decorator
597
 
        from breezy.tests.http_server import HttpServer
 
588
        from bzrlib.tests.http_server import HttpServer
598
589
        # connect to '.' via http which is not listable
599
590
        server = HttpServer()
600
591
        self.start_server(server)
610
601
        # the url should be decorated appropriately
611
602
        self.assertStartsWith(server.get_url(), 'fakenfs+')
612
603
        # and we should be able to get a transport for it
613
 
        t = transport.get_transport_from_url(server.get_url())
 
604
        t = transport.get_transport(server.get_url())
614
605
        # which must be a FakeNFSTransportDecorator instance.
615
606
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
616
607
 
628
619
 
629
620
    def get_vfat_transport(self, url):
630
621
        """Return vfat-backed transport for test directory"""
631
 
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
 
622
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
632
623
        return FakeVFATTransportDecorator('vfat+' + url)
633
624
 
634
625
    def test_transport_creation(self):
635
 
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
 
626
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
636
627
        t = self.get_vfat_transport('.')
637
628
        self.assertIsInstance(t, FakeVFATTransportDecorator)
638
629
 
663
654
 
664
655
    To verify a transport we need a server factory, which is a callable
665
656
    that accepts no parameters and returns an implementation of
666
 
    breezy.transport.Server.
 
657
    bzrlib.transport.Server.
667
658
 
668
659
    That Server is then used to construct transport instances and test
669
660
    the transport via loopback activity.
693
684
        base_url = self._server.get_url()
694
685
        url = self._adjust_url(base_url, relpath)
695
686
        # try getting the transport via the regular interface:
696
 
        t = transport.get_transport_from_url(url)
 
687
        t = transport.get_transport(url)
697
688
        # vila--20070607 if the following are commented out the test suite
698
689
        # still pass. Is this really still needed or was it a forgotten
699
690
        # temporary fix ?
704
695
        return t
705
696
 
706
697
 
707
 
class TestTransportFromPath(tests.TestCaseInTempDir):
708
 
 
709
 
    def test_with_path(self):
710
 
        t = transport.get_transport_from_path(self.test_dir)
711
 
        self.assertIsInstance(t, local.LocalTransport)
712
 
        self.assertEqual(t.base.rstrip("/"),
713
 
                         urlutils.local_path_to_url(self.test_dir))
714
 
 
715
 
    def test_with_url(self):
716
 
        t = transport.get_transport_from_path("file:")
717
 
        self.assertIsInstance(t, local.LocalTransport)
718
 
        self.assertEqual(
719
 
            t.base.rstrip("/"),
720
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
721
 
 
722
 
 
723
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
724
 
 
725
 
    def test_with_path(self):
726
 
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
727
 
                          self.test_dir)
728
 
 
729
 
    def test_with_url(self):
730
 
        url = urlutils.local_path_to_url(self.test_dir)
731
 
        t = transport.get_transport_from_url(url)
732
 
        self.assertIsInstance(t, local.LocalTransport)
733
 
        self.assertEqual(t.base.rstrip("/"), url)
734
 
 
735
 
    def test_with_url_and_segment_parameters(self):
736
 
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
737
 
        t = transport.get_transport_from_url(url)
738
 
        self.assertIsInstance(t, local.LocalTransport)
739
 
        self.assertEqual(t.base.rstrip("/"), url)
740
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
741
 
            f.write("data")
742
 
        self.assertTrue(t.has("afile"))
743
 
 
744
 
 
745
698
class TestLocalTransports(tests.TestCase):
746
699
 
747
700
    def test_get_transport_from_abspath(self):
748
701
        here = osutils.abspath('.')
749
702
        t = transport.get_transport(here)
750
703
        self.assertIsInstance(t, local.LocalTransport)
751
 
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
 
704
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
752
705
 
753
706
    def test_get_transport_from_relpath(self):
 
707
        here = osutils.abspath('.')
754
708
        t = transport.get_transport('.')
755
709
        self.assertIsInstance(t, local.LocalTransport)
756
 
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
 
710
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
757
711
 
758
712
    def test_get_transport_from_local_url(self):
759
713
        here = osutils.abspath('.')
760
714
        here_url = urlutils.local_path_to_url(here) + '/'
761
715
        t = transport.get_transport(here_url)
762
716
        self.assertIsInstance(t, local.LocalTransport)
763
 
        self.assertEqual(t.base, here_url)
 
717
        self.assertEquals(t.base, here_url)
764
718
 
765
719
    def test_local_abspath(self):
766
720
        here = osutils.abspath('.')
767
721
        t = transport.get_transport(here)
768
 
        self.assertEqual(t.local_abspath(''), here)
769
 
 
770
 
 
771
 
class TestLocalTransportMutation(tests.TestCaseInTempDir):
772
 
 
773
 
    def test_local_transport_mkdir(self):
774
 
        here = osutils.abspath('.')
775
 
        t = transport.get_transport(here)
776
 
        t.mkdir('test')
777
 
        self.assertTrue(os.path.exists('test'))
778
 
 
779
 
    def test_local_transport_mkdir_permission_denied(self):
780
 
        # See https://bugs.launchpad.net/bzr/+bug/606537
781
 
        here = osutils.abspath('.')
782
 
        t = transport.get_transport(here)
783
 
 
784
 
        def fake_chmod(path, mode):
785
 
            e = OSError('permission denied')
786
 
            e.errno = errno.EPERM
787
 
            raise e
788
 
        self.overrideAttr(os, 'chmod', fake_chmod)
789
 
        t.mkdir('test')
790
 
        t.mkdir('test2', mode=0o707)
791
 
        self.assertTrue(os.path.exists('test'))
792
 
        self.assertTrue(os.path.exists('test2'))
793
 
 
794
 
 
795
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
796
 
 
797
 
    def test_local_fdatasync_calls_fdatasync(self):
798
 
        """Check fdatasync on a stream tries to flush the data to the OS.
799
 
 
800
 
        We can't easily observe the external effect but we can at least see
801
 
        it's called.
802
 
        """
803
 
        sentinel = object()
804
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
805
 
        if fdatasync is sentinel:
806
 
            raise tests.TestNotApplicable('fdatasync not supported')
807
 
        t = self.get_transport('.')
808
 
        calls = self.recordCalls(os, 'fdatasync')
809
 
        w = t.open_write_stream('out')
810
 
        w.write(b'foo')
811
 
        w.fdatasync()
812
 
        with open('out', 'rb') as f:
813
 
            # Should have been flushed.
814
 
            self.assertEqual(f.read(), b'foo')
815
 
        self.assertEqual(len(calls), 1, calls)
816
 
 
817
 
    def test_missing_directory(self):
818
 
        t = self.get_transport('.')
819
 
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
722
        self.assertEquals(t.local_abspath(''), here)
820
723
 
821
724
 
822
725
class TestWin32LocalTransport(tests.TestCase):
823
726
 
824
727
    def test_unc_clone_to_root(self):
825
 
        self.requireFeature(features.win32_feature)
826
728
        # Win32 UNC path like \\HOST\path
827
729
        # clone to root should stop at least at \\HOST part
828
730
        # not on \\
829
731
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
830
 
        for i in range(4):
 
732
        for i in xrange(4):
831
733
            t = t.clone('..')
832
 
        self.assertEqual(t.base, 'file://HOST/')
 
734
        self.assertEquals(t.base, 'file://HOST/')
833
735
        # make sure we reach the root
834
736
        t = t.clone('..')
835
 
        self.assertEqual(t.base, 'file://HOST/')
 
737
        self.assertEquals(t.base, 'file://HOST/')
836
738
 
837
739
 
838
740
class TestConnectedTransport(tests.TestCase):
841
743
    def test_parse_url(self):
842
744
        t = transport.ConnectedTransport(
843
745
            'http://simple.example.com/home/source')
844
 
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
845
 
        self.assertEqual(t._parsed_url.port, None)
846
 
        self.assertEqual(t._parsed_url.path, '/home/source/')
847
 
        self.assertTrue(t._parsed_url.user is None)
848
 
        self.assertTrue(t._parsed_url.password is None)
 
746
        self.assertEquals(t._host, 'simple.example.com')
 
747
        self.assertEquals(t._port, None)
 
748
        self.assertEquals(t._path, '/home/source/')
 
749
        self.failUnless(t._user is None)
 
750
        self.failUnless(t._password is None)
849
751
 
850
 
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
 
752
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
851
753
 
852
754
    def test_parse_url_with_at_in_user(self):
853
755
        # Bug 228058
854
756
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
855
 
        self.assertEqual(t._parsed_url.user, 'user@host.com')
 
757
        self.assertEquals(t._user, 'user@host.com')
856
758
 
857
759
    def test_parse_quoted_url(self):
858
760
        t = transport.ConnectedTransport(
859
761
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
860
 
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
861
 
        self.assertEqual(t._parsed_url.port, 2222)
862
 
        self.assertEqual(t._parsed_url.user, 'robey')
863
 
        self.assertEqual(t._parsed_url.password, 'h@t')
864
 
        self.assertEqual(t._parsed_url.path, '/path/')
 
762
        self.assertEquals(t._host, 'exAmple.com')
 
763
        self.assertEquals(t._port, 2222)
 
764
        self.assertEquals(t._user, 'robey')
 
765
        self.assertEquals(t._password, 'h@t')
 
766
        self.assertEquals(t._path, '/path/')
865
767
 
866
768
        # Base should not keep track of the password
867
 
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
769
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
868
770
 
869
771
    def test_parse_invalid_url(self):
870
 
        self.assertRaises(urlutils.InvalidURL,
 
772
        self.assertRaises(errors.InvalidURL,
871
773
                          transport.ConnectedTransport,
872
774
                          'sftp://lily.org:~janneke/public/bzr/gub')
873
775
 
874
776
    def test_relpath(self):
875
777
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
876
778
 
877
 
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
878
 
                         'sub')
 
779
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
879
780
        self.assertRaises(errors.PathNotChild, t.relpath,
880
781
                          'http://user@host.com/abs/path/sub')
881
782
        self.assertRaises(errors.PathNotChild, t.relpath,
886
787
                          'sftp://user@host.com:33/abs/path/sub')
887
788
        # Make sure it works when we don't supply a username
888
789
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
889
 
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
790
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
890
791
 
891
792
        # Make sure it works when parts of the path will be url encoded
892
793
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
893
 
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
794
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
894
795
 
895
796
    def test_connection_sharing_propagate_credentials(self):
896
797
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
897
 
        self.assertEqual('user', t._parsed_url.user)
898
 
        self.assertEqual('host.com', t._parsed_url.host)
 
798
        self.assertEquals('user', t._user)
 
799
        self.assertEquals('host.com', t._host)
899
800
        self.assertIs(None, t._get_connection())
900
 
        self.assertIs(None, t._parsed_url.password)
 
801
        self.assertIs(None, t._password)
901
802
        c = t.clone('subdir')
902
803
        self.assertIs(None, c._get_connection())
903
 
        self.assertIs(None, t._parsed_url.password)
 
804
        self.assertIs(None, t._password)
904
805
 
905
806
        # Simulate the user entering a password
906
807
        password = 'secret'
925
826
 
926
827
    def test_reuse_same_transport(self):
927
828
        possible_transports = []
928
 
        t1 = transport.get_transport_from_url(
929
 
            'http://foo/', possible_transports=possible_transports)
 
829
        t1 = transport.get_transport('http://foo/',
 
830
                                     possible_transports=possible_transports)
930
831
        self.assertEqual([t1], possible_transports)
931
 
        t2 = transport.get_transport_from_url('http://foo/',
932
 
                                              possible_transports=[t1])
 
832
        t2 = transport.get_transport('http://foo/',
 
833
                                     possible_transports=[t1])
933
834
        self.assertIs(t1, t2)
934
835
 
935
836
        # Also check that final '/' are handled correctly
936
 
        t3 = transport.get_transport_from_url('http://foo/path/')
937
 
        t4 = transport.get_transport_from_url('http://foo/path',
938
 
                                              possible_transports=[t3])
 
837
        t3 = transport.get_transport('http://foo/path/')
 
838
        t4 = transport.get_transport('http://foo/path',
 
839
                                     possible_transports=[t3])
939
840
        self.assertIs(t3, t4)
940
841
 
941
 
        t5 = transport.get_transport_from_url('http://foo/path')
942
 
        t6 = transport.get_transport_from_url('http://foo/path/',
943
 
                                              possible_transports=[t5])
 
842
        t5 = transport.get_transport('http://foo/path')
 
843
        t6 = transport.get_transport('http://foo/path/',
 
844
                                     possible_transports=[t5])
944
845
        self.assertIs(t5, t6)
945
846
 
946
847
    def test_don_t_reuse_different_transport(self):
947
 
        t1 = transport.get_transport_from_url('http://foo/path')
948
 
        t2 = transport.get_transport_from_url('http://bar/path',
949
 
                                              possible_transports=[t1])
 
848
        t1 = transport.get_transport('http://foo/path')
 
849
        t2 = transport.get_transport('http://bar/path',
 
850
                                     possible_transports=[t1])
950
851
        self.assertIsNot(t1, t2)
951
852
 
952
853
 
953
854
class TestTransportTrace(tests.TestCase):
954
855
 
955
 
    def test_decorator(self):
956
 
        t = transport.get_transport_from_url('trace+memory://')
957
 
        self.assertIsInstance(
958
 
            t, breezy.transport.trace.TransportTraceDecorator)
 
856
    def test_get(self):
 
857
        t = transport.get_transport('trace+memory://')
 
858
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
959
859
 
960
860
    def test_clone_preserves_activity(self):
961
 
        t = transport.get_transport_from_url('trace+memory://')
 
861
        t = transport.get_transport('trace+memory://')
962
862
        t2 = t.clone('.')
963
863
        self.assertTrue(t is not t2)
964
864
        self.assertTrue(t._activity is t2._activity)
968
868
    # still won't cause a test failure when the top level Transport API
969
869
    # changes; so there is little return doing that.
970
870
    def test_get(self):
971
 
        t = transport.get_transport_from_url('trace+memory:///')
972
 
        t.put_bytes('foo', b'barish')
 
871
        t = transport.get_transport('trace+memory:///')
 
872
        t.put_bytes('foo', 'barish')
973
873
        t.get('foo')
974
874
        expected_result = []
975
875
        # put_bytes records the bytes, not the content to avoid memory
980
880
        self.assertEqual(expected_result, t._activity)
981
881
 
982
882
    def test_readv(self):
983
 
        t = transport.get_transport_from_url('trace+memory:///')
984
 
        t.put_bytes('foo', b'barish')
 
883
        t = transport.get_transport('trace+memory:///')
 
884
        t.put_bytes('foo', 'barish')
985
885
        list(t.readv('foo', [(0, 1), (3, 2)],
986
886
                     adjust_for_latency=True, upper_limit=6))
987
887
        expected_result = []
996
896
class TestSSHConnections(tests.TestCaseWithTransport):
997
897
 
998
898
    def test_bzr_connect_to_bzr_ssh(self):
999
 
        """get_transport of a bzr+ssh:// behaves correctly.
 
899
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
1000
900
 
1001
901
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
1002
902
        """
1009
909
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
1010
910
        # override the interface (doesn't change self._vendor).
1011
911
        # Note that this does encryption, so can be slow.
1012
 
        from breezy.tests import stub_sftp
 
912
        from bzrlib.tests import stub_sftp
1013
913
 
1014
914
        # Start an SSH server
1015
915
        self.command_executed = []
1018
918
        # SSH channel ourselves.  Surely this has already been implemented
1019
919
        # elsewhere?
1020
920
        started = []
1021
 
 
1022
921
        class StubSSHServer(stub_sftp.StubServer):
1023
922
 
1024
923
            test = self
1027
926
                self.test.command_executed.append(command)
1028
927
                proc = subprocess.Popen(
1029
928
                    command, shell=True, stdin=subprocess.PIPE,
1030
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1031
 
                    bufsize=0)
 
929
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1032
930
 
1033
931
                # XXX: horribly inefficient, not to mention ugly.
1034
 
                # Start a thread for each of stdin/out/err, and relay bytes
1035
 
                # from the subprocess to channel and vice versa.
 
932
                # Start a thread for each of stdin/out/err, and relay bytes from
 
933
                # the subprocess to channel and vice versa.
1036
934
                def ferry_bytes(read, write, close):
1037
935
                    while True:
1038
936
                        bytes = read(1)
1039
 
                        if bytes == b'':
 
937
                        if bytes == '':
1040
938
                            close()
1041
939
                            break
1042
940
                        write(bytes)
1057
955
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1058
956
        # We *don't* want to override the default SSH vendor: the detected one
1059
957
        # is the one to use.
1060
 
 
1061
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1062
 
        # inherits from SFTPServer which forces the SSH vendor to
1063
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1064
958
        self.start_server(ssh_server)
1065
 
        port = ssh_server.port
 
959
        port = ssh_server._listener.port
1066
960
 
1067
961
        if sys.platform == 'win32':
1068
 
            bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
 
962
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1069
963
        else:
1070
 
            bzr_remote_path = self.get_brz_path()
1071
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
964
            bzr_remote_path = self.get_bzr_path()
 
965
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
1072
966
 
1073
967
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1074
968
        # variable is used to tell bzr what command to run on the remote end.
1083
977
        t.mkdir('foo')
1084
978
 
1085
979
        self.assertEqual(
1086
 
            [b'%s serve --inet --directory=/ --allow-writes' %
1087
 
                bzr_remote_path.encode()],
 
980
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1088
981
            self.command_executed)
1089
982
        # Make sure to disconnect, so that the remote process can stop, and we
1090
983
        # can cleanup. Then pause the test until everything is shutdown
1096
989
        # And the rest are threads
1097
990
        for t in started[1:]:
1098
991
            t.join()
1099
 
 
1100
 
 
1101
 
class TestUnhtml(tests.TestCase):
1102
 
 
1103
 
    """Tests for unhtml_roughly"""
1104
 
 
1105
 
    def test_truncation(self):
1106
 
        fake_html = "<p>something!\n" * 1000
1107
 
        result = http.unhtml_roughly(fake_html)
1108
 
        self.assertEqual(len(result), 1000)
1109
 
        self.assertStartsWith(result, " something!")