/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2019-10-13 22:53:02 UTC
  • mfrom: (7290.1.35 work)
  • mto: This revision was merged to the branch mainline in revision 7405.
  • Revision ID: jelmer@jelmer.uk-20191013225302-vg88ztajzq05hkas
Merge lp:brz/3.0.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
 
from cStringIO import StringIO
 
18
import errno
19
19
import os
20
20
import subprocess
21
21
import sys
22
22
import threading
23
23
 
24
 
from bzrlib import (
 
24
from .. import (
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
28
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.transport import (
 
31
from ..sixish import (
 
32
    BytesIO,
 
33
    )
 
34
from ..transport import (
32
35
    chroot,
33
36
    fakenfs,
 
37
    http,
34
38
    local,
35
39
    memory,
36
40
    pathfilter,
37
41
    readonly,
38
42
    )
39
 
from bzrlib.tests import (
 
43
import breezy.transport.trace
 
44
from . import (
40
45
    features,
41
46
    test_server,
42
47
    )
48
53
class TestTransport(tests.TestCase):
49
54
    """Test the non transport-concrete class functionality."""
50
55
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
56
    def test__get_set_protocol_handlers(self):
55
57
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
62
 
63
63
    def test_get_transport_modules(self):
64
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
66
        # don't pollute the current handlers
66
67
        transport._clear_protocol_handlers()
 
68
 
67
69
        class SampleHandler(object):
68
70
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                          'breezy.tests.test_transport',
 
75
                                          'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                          'breezy.tests.test_transport',
 
79
                                          'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                          'breezy.transport.chroot',
 
82
                          'breezy.transport.pathfilter'],
 
83
                         transport._get_transport_modules())
85
84
 
86
85
    def test_transport_dependency(self):
87
86
        """Transport with missing dependency causes no error"""
88
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
89
        # don't pollute the current handlers
90
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
91
94
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol as e:
 
97
            self.assertEqual('Unsupported protocol'
 
98
                             ' for url "foo://fooserver/foo":'
 
99
                             ' Unable to import library "some_lib":'
 
100
                             ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
108
103
 
109
104
    def test_transport_fallback(self):
110
105
        """Transport with missing dependency causes no error"""
111
106
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
116
 
124
117
    def test_ssh_hints(self):
125
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
119
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
128
 
        except errors.UnsupportedProtocol, e:
129
 
            e_str = str(e)
130
 
            self.assertEquals('Unsupported protocol'
131
 
                              ' for url "ssh://fooserver/foo":'
132
 
                              ' bzr supports bzr+ssh to operate over ssh,'
133
 
                              ' use "bzr+ssh://fooserver/foo".',
134
 
                              str(e))
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
 
121
        except errors.UnsupportedProtocol as e:
 
122
            self.assertEqual(
 
123
                'Unsupported protocol'
 
124
                ' for url "ssh://fooserver/foo":'
 
125
                ' Use bzr+ssh for Bazaar operations over SSH, '
 
126
                'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
 
127
                'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
 
128
                str(e))
135
129
        else:
136
130
            self.fail('Did not raise UnsupportedProtocol')
137
131
 
140
134
        a_file = transport.LateReadError('a path')
141
135
        try:
142
136
            a_file.read()
143
 
        except errors.ReadError, error:
 
137
        except errors.ReadError as error:
144
138
            self.assertEqual('a path', error.path)
145
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
146
140
        a_file.close()
147
141
 
148
 
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
150
 
        self.assertEqual('/home/sarah/project/foo',
151
 
                         t._combine_paths('/home/sarah', 'project/foo'))
152
 
        self.assertEqual('/etc',
153
 
                         t._combine_paths('/home/sarah', '../../etc'))
154
 
        self.assertEqual('/etc',
155
 
                         t._combine_paths('/home/sarah', '../../../etc'))
156
 
        self.assertEqual('/etc',
157
 
                         t._combine_paths('/home/sarah', '/etc'))
158
 
 
159
142
    def test_local_abspath_non_local_transport(self):
160
143
        # the base implementation should throw
161
144
        t = memory.MemoryTransport()
181
164
    def test_coalesce_unrelated(self):
182
165
        self.check([(0, 10, [(0, 10)]),
183
166
                    (20, 10, [(0, 10)]),
184
 
                   ], [(0, 10), (20, 10)])
 
167
                    ], [(0, 10), (20, 10)])
185
168
 
186
169
    def test_coalesce_unsorted(self):
187
170
        self.check([(20, 10, [(0, 10)]),
188
171
                    (0, 10, [(0, 10)]),
189
 
                   ], [(20, 10), (0, 10)])
 
172
                    ], [(20, 10), (0, 10)])
190
173
 
191
174
    def test_coalesce_nearby(self):
192
175
        self.check([(0, 20, [(0, 10), (10, 10)])],
194
177
 
195
178
    def test_coalesce_overlapped(self):
196
179
        self.assertRaises(ValueError,
197
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
198
 
                        [(0, 10), (5, 10)])
 
180
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
 
181
                          [(0, 10), (5, 10)])
199
182
 
200
183
    def test_coalesce_limit(self):
201
184
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
202
185
                              (30, 10), (40, 10)]),
203
186
                    (60, 50, [(0, 10), (10, 10), (20, 10),
204
187
                              (30, 10), (40, 10)]),
205
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
206
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
207
 
                       (90, 10), (100, 10)],
208
 
                    limit=5)
 
188
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
189
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
190
                        (90, 10), (100, 10)],
 
191
                   limit=5)
209
192
 
210
193
    def test_coalesce_no_limit(self):
211
194
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
212
195
                               (30, 10), (40, 10), (50, 10),
213
196
                               (60, 10), (70, 10), (80, 10),
214
197
                               (90, 10)]),
215
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
216
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
217
 
                       (90, 10), (100, 10)])
 
198
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
199
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
200
                        (90, 10), (100, 10)])
218
201
 
219
202
    def test_coalesce_fudge(self):
220
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
222
 
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
204
                    (100, 10, [(0, 10)]),
 
205
                    ], [(10, 10), (30, 10), (100, 10)],
 
206
                   fudge=10)
 
207
 
225
208
    def test_coalesce_max_size(self):
226
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
210
                    (30, 50, [(0, 50)]),
228
211
                    # If one range is above max_size, it gets its own coalesced
229
212
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
213
                    (100, 80, [(0, 80)]), ],
231
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
215
                   max_size=50)
234
216
 
235
217
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
 
                  )
 
220
                   )
239
221
 
240
222
    def test_coalesce_default_limit(self):
241
223
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
244
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
248
 
                   max_size=1*1024*1024*1024)
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check(
 
226
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
227
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
 
228
            [(i * ten_mb, ten_mb) for i in range(11)])
 
229
        self.check(
 
230
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
231
            [(i * ten_mb, ten_mb) for i in range(11)],
 
232
            max_size=1 * 1024 * 1024 * 1024)
249
233
 
250
234
 
251
235
class TestMemoryServer(tests.TestCase):
255
239
        server.start_server()
256
240
        url = server.get_url()
257
241
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
242
        t = transport.get_transport_from_url(url)
259
243
        del t
260
244
        server.stop_server()
261
245
        self.assertFalse(url in transport.transport_list_registry)
288
272
 
289
273
    def test_append_and_get(self):
290
274
        t = memory.MemoryTransport()
291
 
        t.append_bytes('path', 'content')
292
 
        self.assertEqual(t.get('path').read(), 'content')
293
 
        t.append_file('path', StringIO('content'))
294
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
275
        t.append_bytes('path', b'content')
 
276
        self.assertEqual(t.get('path').read(), b'content')
 
277
        t.append_file('path', BytesIO(b'content'))
 
278
        with t.get('path') as f:
 
279
            self.assertEqual(f.read(), b'contentcontent')
295
280
 
296
281
    def test_put_and_get(self):
297
282
        t = memory.MemoryTransport()
298
 
        t.put_file('path', StringIO('content'))
299
 
        self.assertEqual(t.get('path').read(), 'content')
300
 
        t.put_bytes('path', 'content')
301
 
        self.assertEqual(t.get('path').read(), 'content')
 
283
        t.put_file('path', BytesIO(b'content'))
 
284
        self.assertEqual(t.get('path').read(), b'content')
 
285
        t.put_bytes('path', b'content')
 
286
        self.assertEqual(t.get('path').read(), b'content')
302
287
 
303
288
    def test_append_without_dir_fails(self):
304
289
        t = memory.MemoryTransport()
305
290
        self.assertRaises(errors.NoSuchFile,
306
 
                          t.append_bytes, 'dir/path', 'content')
 
291
                          t.append_bytes, 'dir/path', b'content')
307
292
 
308
293
    def test_put_without_dir_fails(self):
309
294
        t = memory.MemoryTransport()
310
295
        self.assertRaises(errors.NoSuchFile,
311
 
                          t.put_file, 'dir/path', StringIO('content'))
 
296
                          t.put_file, 'dir/path', BytesIO(b'content'))
312
297
 
313
298
    def test_get_missing(self):
314
299
        transport = memory.MemoryTransport()
316
301
 
317
302
    def test_has_missing(self):
318
303
        t = memory.MemoryTransport()
319
 
        self.assertEquals(False, t.has('foo'))
 
304
        self.assertEqual(False, t.has('foo'))
320
305
 
321
306
    def test_has_present(self):
322
307
        t = memory.MemoryTransport()
323
 
        t.append_bytes('foo', 'content')
324
 
        self.assertEquals(True, t.has('foo'))
 
308
        t.append_bytes('foo', b'content')
 
309
        self.assertEqual(True, t.has('foo'))
325
310
 
326
311
    def test_list_dir(self):
327
312
        t = memory.MemoryTransport()
328
 
        t.put_bytes('foo', 'content')
 
313
        t.put_bytes('foo', b'content')
329
314
        t.mkdir('dir')
330
 
        t.put_bytes('dir/subfoo', 'content')
331
 
        t.put_bytes('dirlike', 'content')
 
315
        t.put_bytes('dir/subfoo', b'content')
 
316
        t.put_bytes('dirlike', b'content')
332
317
 
333
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
318
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
319
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
335
320
 
336
321
    def test_mkdir(self):
337
322
        t = memory.MemoryTransport()
338
323
        t.mkdir('dir')
339
 
        t.append_bytes('dir/path', 'content')
340
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
324
        t.append_bytes('dir/path', b'content')
 
325
        with t.get('dir/path') as f:
 
326
            self.assertEqual(f.read(), b'content')
341
327
 
342
328
    def test_mkdir_missing_parent(self):
343
329
        t = memory.MemoryTransport()
356
342
    def test_iter_files_recursive(self):
357
343
        t = memory.MemoryTransport()
358
344
        t.mkdir('dir')
359
 
        t.put_bytes('dir/foo', 'content')
360
 
        t.put_bytes('dir/bar', 'content')
361
 
        t.put_bytes('bar', 'content')
 
345
        t.put_bytes('dir/foo', b'content')
 
346
        t.put_bytes('dir/bar', b'content')
 
347
        t.put_bytes('bar', b'content')
362
348
        paths = set(t.iter_files_recursive())
363
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
349
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
364
350
 
365
351
    def test_stat(self):
366
352
        t = memory.MemoryTransport()
367
 
        t.put_bytes('foo', 'content')
368
 
        t.put_bytes('bar', 'phowar')
 
353
        t.put_bytes('foo', b'content')
 
354
        t.put_bytes('bar', b'phowar')
369
355
        self.assertEqual(7, t.stat('foo').st_size)
370
356
        self.assertEqual(6, t.stat('bar').st_size)
371
357
 
376
362
    def test_abspath(self):
377
363
        # The abspath is always relative to the chroot_url.
378
364
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
365
            transport.get_transport_from_url('memory:///foo/bar/'))
380
366
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
367
        t = transport.get_transport_from_url(server.get_url())
382
368
        self.assertEqual(server.get_url(), t.abspath('/'))
383
369
 
384
370
        subdir_t = t.clone('subdir')
386
372
 
387
373
    def test_clone(self):
388
374
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
375
            transport.get_transport_from_url('memory:///foo/bar/'))
390
376
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
377
        t = transport.get_transport_from_url(server.get_url())
392
378
        # relpath from root and root path are the same
393
379
        relpath_cloned = t.clone('foo')
394
380
        abspath_cloned = t.clone('/foo')
403
389
        This is so that it is not possible to escape a chroot by doing::
404
390
            url = chroot_transport.base
405
391
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
392
            new_t = transport.get_transport_from_url(parent_url)
407
393
        """
408
394
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
395
            transport.get_transport_from_url('memory:///path/subpath'))
410
396
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
397
        t = transport.get_transport_from_url(server.get_url())
 
398
        new_t = transport.get_transport_from_url(t.base)
413
399
        self.assertEqual(t.server, new_t.server)
414
400
        self.assertEqual(t.base, new_t.base)
415
401
 
420
406
        This is so that it is not possible to escape a chroot by doing::
421
407
            url = chroot_transport.base
422
408
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
409
            new_t = transport.get_transport_from_url(parent_url)
424
410
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
411
        server = chroot.ChrootServer(
 
412
            transport.get_transport_from_url('memory:///path/'))
426
413
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
414
        t = transport.get_transport_from_url(server.get_url())
428
415
        self.assertRaises(
429
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
416
            urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
430
417
 
431
418
 
432
419
class TestChrootServer(tests.TestCase):
440
427
        backing_transport = memory.MemoryTransport()
441
428
        server = chroot.ChrootServer(backing_transport)
442
429
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
430
        self.addCleanup(server.stop_server)
 
431
        self.assertTrue(server.scheme
 
432
                        in transport._get_protocol_handlers().keys())
448
433
 
449
434
    def test_stop_server(self):
450
435
        backing_transport = memory.MemoryTransport()
458
443
        backing_transport = memory.MemoryTransport()
459
444
        server = chroot.ChrootServer(backing_transport)
460
445
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
446
        self.addCleanup(server.stop_server)
 
447
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
448
 
 
449
 
 
450
class TestHooks(tests.TestCase):
 
451
    """Basic tests for transport hooks"""
 
452
 
 
453
    def _get_connected_transport(self):
 
454
        return transport.ConnectedTransport("bogus:nowhere")
 
455
 
 
456
    def test_transporthooks_initialisation(self):
 
457
        """Check all expected transport hook points are set up"""
 
458
        hookpoint = transport.TransportHooks()
 
459
        self.assertTrue("post_connect" in hookpoint,
 
460
                        "post_connect not in %s" % (hookpoint,))
 
461
 
 
462
    def test_post_connect(self):
 
463
        """Ensure the post_connect hook is called when _set_transport is"""
 
464
        calls = []
 
465
        transport.Transport.hooks.install_named_hook("post_connect",
 
466
                                                     calls.append, None)
 
467
        t = self._get_connected_transport()
 
468
        self.assertLength(0, calls)
 
469
        t._set_connection("connection", "auth")
 
470
        self.assertEqual(calls, [t])
465
471
 
466
472
 
467
473
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
476
    def test_abspath(self):
471
477
        # The abspath is always relative to the base of the backing transport.
472
478
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
479
            transport.get_transport_from_url('memory:///foo/bar/'),
474
480
            lambda x: x)
475
481
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
482
        t = transport.get_transport_from_url(server.get_url())
477
483
        self.assertEqual(server.get_url(), t.abspath('/'))
478
484
 
479
485
        subdir_t = t.clone('subdir')
482
488
 
483
489
    def make_pf_transport(self, filter_func=None):
484
490
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
491
 
486
492
        :param filter_func: by default this will be a no-op function.  Use this
487
493
            parameter to override it."""
488
494
        if filter_func is None:
489
 
            filter_func = lambda x: x
 
495
            def filter_func(x):
 
496
                return x
490
497
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
498
            transport.get_transport_from_url('memory:///foo/bar/'),
 
499
            filter_func)
492
500
        server.start_server()
493
501
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
502
        return transport.get_transport_from_url(server.get_url())
495
503
 
496
504
    def test__filter(self):
497
505
        # _filter (with an identity func as filter_func) always returns
510
518
 
511
519
    def test_filter_invocation(self):
512
520
        filter_log = []
 
521
 
513
522
        def filter(path):
514
523
            filter_log.append(path)
515
524
            return path
540
549
        otherwise) the filtering by doing::
541
550
            url = filtered_transport.base
542
551
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
552
            new_t = transport.get_transport_from_url(parent_url)
544
553
        """
545
554
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
555
        new_t = transport.get_transport_from_url(t.base)
547
556
        self.assertEqual(t.server, new_t.server)
548
557
        self.assertEqual(t.base, new_t.base)
549
558
 
558
567
        self.assertEqual(True, t.is_readonly())
559
568
 
560
569
    def test_http_parameters(self):
561
 
        from bzrlib.tests.http_server import HttpServer
 
570
        from breezy.tests.http_server import HttpServer
562
571
        # connect to '.' via http which is not listable
563
572
        server = HttpServer()
564
573
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
574
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
575
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
576
        self.assertEqual(False, t.listable())
568
577
        self.assertEqual(True, t.is_readonly())
569
578
 
585
594
    def test_http_parameters(self):
586
595
        # the listable and is_readonly parameters
587
596
        # are not changed by the fakenfs decorator
588
 
        from bzrlib.tests.http_server import HttpServer
 
597
        from breezy.tests.http_server import HttpServer
589
598
        # connect to '.' via http which is not listable
590
599
        server = HttpServer()
591
600
        self.start_server(server)
601
610
        # the url should be decorated appropriately
602
611
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
612
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
613
        t = transport.get_transport_from_url(server.get_url())
605
614
        # which must be a FakeNFSTransportDecorator instance.
606
615
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
616
 
619
628
 
620
629
    def get_vfat_transport(self, url):
621
630
        """Return vfat-backed transport for test directory"""
622
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
631
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
623
632
        return FakeVFATTransportDecorator('vfat+' + url)
624
633
 
625
634
    def test_transport_creation(self):
626
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
635
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
627
636
        t = self.get_vfat_transport('.')
628
637
        self.assertIsInstance(t, FakeVFATTransportDecorator)
629
638
 
654
663
 
655
664
    To verify a transport we need a server factory, which is a callable
656
665
    that accepts no parameters and returns an implementation of
657
 
    bzrlib.transport.Server.
 
666
    breezy.transport.Server.
658
667
 
659
668
    That Server is then used to construct transport instances and test
660
669
    the transport via loopback activity.
684
693
        base_url = self._server.get_url()
685
694
        url = self._adjust_url(base_url, relpath)
686
695
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
696
        t = transport.get_transport_from_url(url)
688
697
        # vila--20070607 if the following are commented out the test suite
689
698
        # still pass. Is this really still needed or was it a forgotten
690
699
        # temporary fix ?
695
704
        return t
696
705
 
697
706
 
 
707
class TestTransportFromPath(tests.TestCaseInTempDir):
 
708
 
 
709
    def test_with_path(self):
 
710
        t = transport.get_transport_from_path(self.test_dir)
 
711
        self.assertIsInstance(t, local.LocalTransport)
 
712
        self.assertEqual(t.base.rstrip("/"),
 
713
                         urlutils.local_path_to_url(self.test_dir))
 
714
 
 
715
    def test_with_url(self):
 
716
        t = transport.get_transport_from_path("file:")
 
717
        self.assertIsInstance(t, local.LocalTransport)
 
718
        self.assertEqual(
 
719
            t.base.rstrip("/"),
 
720
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
721
 
 
722
 
 
723
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
724
 
 
725
    def test_with_path(self):
 
726
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
 
727
                          self.test_dir)
 
728
 
 
729
    def test_with_url(self):
 
730
        url = urlutils.local_path_to_url(self.test_dir)
 
731
        t = transport.get_transport_from_url(url)
 
732
        self.assertIsInstance(t, local.LocalTransport)
 
733
        self.assertEqual(t.base.rstrip("/"), url)
 
734
 
 
735
    def test_with_url_and_segment_parameters(self):
 
736
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
 
737
        t = transport.get_transport_from_url(url)
 
738
        self.assertIsInstance(t, local.LocalTransport)
 
739
        self.assertEqual(t.base.rstrip("/"), url)
 
740
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
741
            f.write("data")
 
742
        self.assertTrue(t.has("afile"))
 
743
 
 
744
 
698
745
class TestLocalTransports(tests.TestCase):
699
746
 
700
747
    def test_get_transport_from_abspath(self):
701
748
        here = osutils.abspath('.')
702
749
        t = transport.get_transport(here)
703
750
        self.assertIsInstance(t, local.LocalTransport)
704
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
751
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
705
752
 
706
753
    def test_get_transport_from_relpath(self):
707
 
        here = osutils.abspath('.')
708
754
        t = transport.get_transport('.')
709
755
        self.assertIsInstance(t, local.LocalTransport)
710
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
756
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
711
757
 
712
758
    def test_get_transport_from_local_url(self):
713
759
        here = osutils.abspath('.')
714
760
        here_url = urlutils.local_path_to_url(here) + '/'
715
761
        t = transport.get_transport(here_url)
716
762
        self.assertIsInstance(t, local.LocalTransport)
717
 
        self.assertEquals(t.base, here_url)
 
763
        self.assertEqual(t.base, here_url)
718
764
 
719
765
    def test_local_abspath(self):
720
766
        here = osutils.abspath('.')
721
767
        t = transport.get_transport(here)
722
 
        self.assertEquals(t.local_abspath(''), here)
 
768
        self.assertEqual(t.local_abspath(''), here)
 
769
 
 
770
 
 
771
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
772
 
 
773
    def test_local_transport_mkdir(self):
 
774
        here = osutils.abspath('.')
 
775
        t = transport.get_transport(here)
 
776
        t.mkdir('test')
 
777
        self.assertTrue(os.path.exists('test'))
 
778
 
 
779
    def test_local_transport_mkdir_permission_denied(self):
 
780
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
781
        here = osutils.abspath('.')
 
782
        t = transport.get_transport(here)
 
783
 
 
784
        def fake_chmod(path, mode):
 
785
            e = OSError('permission denied')
 
786
            e.errno = errno.EPERM
 
787
            raise e
 
788
        self.overrideAttr(os, 'chmod', fake_chmod)
 
789
        t.mkdir('test')
 
790
        t.mkdir('test2', mode=0o707)
 
791
        self.assertTrue(os.path.exists('test'))
 
792
        self.assertTrue(os.path.exists('test2'))
 
793
 
 
794
 
 
795
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
796
 
 
797
    def test_local_fdatasync_calls_fdatasync(self):
 
798
        """Check fdatasync on a stream tries to flush the data to the OS.
 
799
 
 
800
        We can't easily observe the external effect but we can at least see
 
801
        it's called.
 
802
        """
 
803
        sentinel = object()
 
804
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
805
        if fdatasync is sentinel:
 
806
            raise tests.TestNotApplicable('fdatasync not supported')
 
807
        t = self.get_transport('.')
 
808
        calls = self.recordCalls(os, 'fdatasync')
 
809
        w = t.open_write_stream('out')
 
810
        w.write(b'foo')
 
811
        w.fdatasync()
 
812
        with open('out', 'rb') as f:
 
813
            # Should have been flushed.
 
814
            self.assertEqual(f.read(), b'foo')
 
815
        self.assertEqual(len(calls), 1, calls)
 
816
 
 
817
    def test_missing_directory(self):
 
818
        t = self.get_transport('.')
 
819
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
723
820
 
724
821
 
725
822
class TestWin32LocalTransport(tests.TestCase):
726
823
 
727
824
    def test_unc_clone_to_root(self):
 
825
        self.requireFeature(features.win32_feature)
728
826
        # Win32 UNC path like \\HOST\path
729
827
        # clone to root should stop at least at \\HOST part
730
828
        # not on \\
731
829
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
732
 
        for i in xrange(4):
 
830
        for i in range(4):
733
831
            t = t.clone('..')
734
 
        self.assertEquals(t.base, 'file://HOST/')
 
832
        self.assertEqual(t.base, 'file://HOST/')
735
833
        # make sure we reach the root
736
834
        t = t.clone('..')
737
 
        self.assertEquals(t.base, 'file://HOST/')
 
835
        self.assertEqual(t.base, 'file://HOST/')
738
836
 
739
837
 
740
838
class TestConnectedTransport(tests.TestCase):
743
841
    def test_parse_url(self):
744
842
        t = transport.ConnectedTransport(
745
843
            'http://simple.example.com/home/source')
746
 
        self.assertEquals(t._host, 'simple.example.com')
747
 
        self.assertEquals(t._port, None)
748
 
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
844
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
 
845
        self.assertEqual(t._parsed_url.port, None)
 
846
        self.assertEqual(t._parsed_url.path, '/home/source/')
 
847
        self.assertTrue(t._parsed_url.user is None)
 
848
        self.assertTrue(t._parsed_url.password is None)
751
849
 
752
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
850
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
753
851
 
754
852
    def test_parse_url_with_at_in_user(self):
755
853
        # Bug 228058
756
854
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
757
 
        self.assertEquals(t._user, 'user@host.com')
 
855
        self.assertEqual(t._parsed_url.user, 'user@host.com')
758
856
 
759
857
    def test_parse_quoted_url(self):
760
858
        t = transport.ConnectedTransport(
761
859
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
 
        self.assertEquals(t._host, 'exAmple.com')
763
 
        self.assertEquals(t._port, 2222)
764
 
        self.assertEquals(t._user, 'robey')
765
 
        self.assertEquals(t._password, 'h@t')
766
 
        self.assertEquals(t._path, '/path/')
 
860
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
 
861
        self.assertEqual(t._parsed_url.port, 2222)
 
862
        self.assertEqual(t._parsed_url.user, 'robey')
 
863
        self.assertEqual(t._parsed_url.password, 'h@t')
 
864
        self.assertEqual(t._parsed_url.path, '/path/')
767
865
 
768
866
        # Base should not keep track of the password
769
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
867
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
770
868
 
771
869
    def test_parse_invalid_url(self):
772
 
        self.assertRaises(errors.InvalidURL,
 
870
        self.assertRaises(urlutils.InvalidURL,
773
871
                          transport.ConnectedTransport,
774
872
                          'sftp://lily.org:~janneke/public/bzr/gub')
775
873
 
776
874
    def test_relpath(self):
777
875
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
876
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
877
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
 
878
                         'sub')
780
879
        self.assertRaises(errors.PathNotChild, t.relpath,
781
880
                          'http://user@host.com/abs/path/sub')
782
881
        self.assertRaises(errors.PathNotChild, t.relpath,
787
886
                          'sftp://user@host.com:33/abs/path/sub')
788
887
        # Make sure it works when we don't supply a username
789
888
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
790
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
889
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
791
890
 
792
891
        # Make sure it works when parts of the path will be url encoded
793
892
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
794
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
893
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
795
894
 
796
895
    def test_connection_sharing_propagate_credentials(self):
797
896
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
798
 
        self.assertEquals('user', t._user)
799
 
        self.assertEquals('host.com', t._host)
 
897
        self.assertEqual('user', t._parsed_url.user)
 
898
        self.assertEqual('host.com', t._parsed_url.host)
800
899
        self.assertIs(None, t._get_connection())
801
 
        self.assertIs(None, t._password)
 
900
        self.assertIs(None, t._parsed_url.password)
802
901
        c = t.clone('subdir')
803
902
        self.assertIs(None, c._get_connection())
804
 
        self.assertIs(None, t._password)
 
903
        self.assertIs(None, t._parsed_url.password)
805
904
 
806
905
        # Simulate the user entering a password
807
906
        password = 'secret'
826
925
 
827
926
    def test_reuse_same_transport(self):
828
927
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
830
 
                                     possible_transports=possible_transports)
 
928
        t1 = transport.get_transport_from_url(
 
929
            'http://foo/', possible_transports=possible_transports)
831
930
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
833
 
                                     possible_transports=[t1])
 
931
        t2 = transport.get_transport_from_url('http://foo/',
 
932
                                              possible_transports=[t1])
834
933
        self.assertIs(t1, t2)
835
934
 
836
935
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
839
 
                                     possible_transports=[t3])
 
936
        t3 = transport.get_transport_from_url('http://foo/path/')
 
937
        t4 = transport.get_transport_from_url('http://foo/path',
 
938
                                              possible_transports=[t3])
840
939
        self.assertIs(t3, t4)
841
940
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
844
 
                                     possible_transports=[t5])
 
941
        t5 = transport.get_transport_from_url('http://foo/path')
 
942
        t6 = transport.get_transport_from_url('http://foo/path/',
 
943
                                              possible_transports=[t5])
845
944
        self.assertIs(t5, t6)
846
945
 
847
946
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
850
 
                                     possible_transports=[t1])
 
947
        t1 = transport.get_transport_from_url('http://foo/path')
 
948
        t2 = transport.get_transport_from_url('http://bar/path',
 
949
                                              possible_transports=[t1])
851
950
        self.assertIsNot(t1, t2)
852
951
 
853
952
 
854
953
class TestTransportTrace(tests.TestCase):
855
954
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
955
    def test_decorator(self):
 
956
        t = transport.get_transport_from_url('trace+memory://')
 
957
        self.assertIsInstance(
 
958
            t, breezy.transport.trace.TransportTraceDecorator)
859
959
 
860
960
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
961
        t = transport.get_transport_from_url('trace+memory://')
862
962
        t2 = t.clone('.')
863
963
        self.assertTrue(t is not t2)
864
964
        self.assertTrue(t._activity is t2._activity)
868
968
    # still won't cause a test failure when the top level Transport API
869
969
    # changes; so there is little return doing that.
870
970
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
872
 
        t.put_bytes('foo', 'barish')
 
971
        t = transport.get_transport_from_url('trace+memory:///')
 
972
        t.put_bytes('foo', b'barish')
873
973
        t.get('foo')
874
974
        expected_result = []
875
975
        # put_bytes records the bytes, not the content to avoid memory
880
980
        self.assertEqual(expected_result, t._activity)
881
981
 
882
982
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
884
 
        t.put_bytes('foo', 'barish')
 
983
        t = transport.get_transport_from_url('trace+memory:///')
 
984
        t.put_bytes('foo', b'barish')
885
985
        list(t.readv('foo', [(0, 1), (3, 2)],
886
986
                     adjust_for_latency=True, upper_limit=6))
887
987
        expected_result = []
896
996
class TestSSHConnections(tests.TestCaseWithTransport):
897
997
 
898
998
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
999
        """get_transport of a bzr+ssh:// behaves correctly.
900
1000
 
901
1001
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
1002
        """
909
1009
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
910
1010
        # override the interface (doesn't change self._vendor).
911
1011
        # Note that this does encryption, so can be slow.
912
 
        from bzrlib.tests import stub_sftp
 
1012
        from breezy.tests import stub_sftp
913
1013
 
914
1014
        # Start an SSH server
915
1015
        self.command_executed = []
918
1018
        # SSH channel ourselves.  Surely this has already been implemented
919
1019
        # elsewhere?
920
1020
        started = []
 
1021
 
921
1022
        class StubSSHServer(stub_sftp.StubServer):
922
1023
 
923
1024
            test = self
926
1027
                self.test.command_executed.append(command)
927
1028
                proc = subprocess.Popen(
928
1029
                    command, shell=True, stdin=subprocess.PIPE,
929
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
1030
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
 
1031
                    bufsize=0)
930
1032
 
931
1033
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
1034
                # Start a thread for each of stdin/out/err, and relay bytes
 
1035
                # from the subprocess to channel and vice versa.
934
1036
                def ferry_bytes(read, write, close):
935
1037
                    while True:
936
1038
                        bytes = read(1)
937
 
                        if bytes == '':
 
1039
                        if bytes == b'':
938
1040
                            close()
939
1041
                            break
940
1042
                        write(bytes)
955
1057
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
1058
        # We *don't* want to override the default SSH vendor: the detected one
957
1059
        # is the one to use.
 
1060
 
 
1061
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1062
        # inherits from SFTPServer which forces the SSH vendor to
 
1063
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
1064
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
1065
        port = ssh_server.port
960
1066
 
961
1067
        if sys.platform == 'win32':
962
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1068
            bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
963
1069
        else:
964
 
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
1070
            bzr_remote_path = self.get_brz_path()
 
1071
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
1072
 
967
1073
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
1074
        # variable is used to tell bzr what command to run on the remote end.
977
1083
        t.mkdir('foo')
978
1084
 
979
1085
        self.assertEqual(
980
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1086
            [b'%s serve --inet --directory=/ --allow-writes' %
 
1087
                bzr_remote_path.encode()],
981
1088
            self.command_executed)
982
1089
        # Make sure to disconnect, so that the remote process can stop, and we
983
1090
        # can cleanup. Then pause the test until everything is shutdown
989
1096
        # And the rest are threads
990
1097
        for t in started[1:]:
991
1098
            t.join()
 
1099
 
 
1100
 
 
1101
class TestUnhtml(tests.TestCase):
 
1102
 
 
1103
    """Tests for unhtml_roughly"""
 
1104
 
 
1105
    def test_truncation(self):
 
1106
        fake_html = "<p>something!\n" * 1000
 
1107
        result = http.unhtml_roughly(fake_html)
 
1108
        self.assertEqual(len(result), 1000)
 
1109
        self.assertStartsWith(result, " something!")