/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_transport.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2020-08-23 01:15:41 UTC
  • mfrom: (7520.1.4 merge-3.1)
  • Revision ID: breezy.the.bot@gmail.com-20200823011541-nv0oh7nzaganx2qy
Merge lp:brz/3.1.

Merged from https://code.launchpad.net/~jelmer/brz/merge-3.1/+merge/389690

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
 
from cStringIO import StringIO
 
18
import errno
 
19
from io import BytesIO
19
20
import os
20
21
import subprocess
21
22
import sys
22
23
import threading
23
24
 
24
 
from bzrlib import (
 
25
from .. import (
25
26
    errors,
26
27
    osutils,
27
28
    tests,
28
29
    transport,
29
30
    urlutils,
30
31
    )
31
 
from bzrlib.transport import (
 
32
from ..transport import (
32
33
    chroot,
33
34
    fakenfs,
 
35
    http,
34
36
    local,
35
37
    memory,
36
38
    pathfilter,
37
39
    readonly,
38
40
    )
39
 
from bzrlib.tests import (
 
41
import breezy.transport.trace
 
42
from . import (
40
43
    features,
41
44
    test_server,
42
45
    )
48
51
class TestTransport(tests.TestCase):
49
52
    """Test the non transport-concrete class functionality."""
50
53
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
54
    def test__get_set_protocol_handlers(self):
55
55
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
56
        self.assertNotEqual([], handlers.keys())
 
57
        transport._clear_protocol_handlers()
 
58
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
59
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
60
 
63
61
    def test_get_transport_modules(self):
64
62
        handlers = transport._get_protocol_handlers()
 
63
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
64
        # don't pollute the current handlers
66
65
        transport._clear_protocol_handlers()
 
66
 
67
67
        class SampleHandler(object):
68
68
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
69
        transport._clear_protocol_handlers()
 
70
        transport.register_transport_proto('foo')
 
71
        transport.register_lazy_transport('foo',
 
72
                                          'breezy.tests.test_transport',
 
73
                                          'TestTransport.SampleHandler')
 
74
        transport.register_transport_proto('bar')
 
75
        transport.register_lazy_transport('bar',
 
76
                                          'breezy.tests.test_transport',
 
77
                                          'TestTransport.SampleHandler')
 
78
        self.assertEqual([SampleHandler.__module__,
 
79
                          'breezy.transport.chroot',
 
80
                          'breezy.transport.pathfilter'],
 
81
                         transport._get_transport_modules())
85
82
 
86
83
    def test_transport_dependency(self):
87
84
        """Transport with missing dependency causes no error"""
88
85
        saved_handlers = transport._get_protocol_handlers()
 
86
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
87
        # don't pollute the current handlers
90
88
        transport._clear_protocol_handlers()
 
89
        transport.register_transport_proto('foo')
 
90
        transport.register_lazy_transport(
 
91
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
91
92
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
93
            transport.get_transport_from_url('foo://fooserver/foo')
 
94
        except errors.UnsupportedProtocol as e:
 
95
            self.assertEqual('Unsupported protocol'
 
96
                             ' for url "foo://fooserver/foo":'
 
97
                             ' Unable to import library "some_lib":'
 
98
                             ' testing missing dependency', str(e))
 
99
        else:
 
100
            self.fail('Did not raise UnsupportedProtocol')
108
101
 
109
102
    def test_transport_fallback(self):
110
103
        """Transport with missing dependency causes no error"""
111
104
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
105
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
106
        transport._clear_protocol_handlers()
 
107
        transport.register_transport_proto('foo')
 
108
        transport.register_lazy_transport(
 
109
            'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
 
112
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
113
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
114
 
124
115
    def test_ssh_hints(self):
125
116
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
117
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
128
 
        except errors.UnsupportedProtocol, e:
129
 
            e_str = str(e)
130
 
            self.assertEquals('Unsupported protocol'
131
 
                              ' for url "ssh://fooserver/foo":'
132
 
                              ' bzr supports bzr+ssh to operate over ssh,'
133
 
                              ' use "bzr+ssh://fooserver/foo".',
134
 
                              str(e))
 
118
            transport.get_transport_from_url('ssh://fooserver/foo')
 
119
        except errors.UnsupportedProtocol as e:
 
120
            self.assertEqual(
 
121
                'Unsupported protocol'
 
122
                ' for url "ssh://fooserver/foo":'
 
123
                ' Use bzr+ssh for Bazaar operations over SSH, '
 
124
                'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
 
125
                'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
 
126
                str(e))
135
127
        else:
136
128
            self.fail('Did not raise UnsupportedProtocol')
137
129
 
140
132
        a_file = transport.LateReadError('a path')
141
133
        try:
142
134
            a_file.read()
143
 
        except errors.ReadError, error:
 
135
        except errors.ReadError as error:
144
136
            self.assertEqual('a path', error.path)
145
137
        self.assertRaises(errors.ReadError, a_file.read, 40)
146
138
        a_file.close()
147
139
 
148
 
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
150
 
        self.assertEqual('/home/sarah/project/foo',
151
 
                         t._combine_paths('/home/sarah', 'project/foo'))
152
 
        self.assertEqual('/etc',
153
 
                         t._combine_paths('/home/sarah', '../../etc'))
154
 
        self.assertEqual('/etc',
155
 
                         t._combine_paths('/home/sarah', '../../../etc'))
156
 
        self.assertEqual('/etc',
157
 
                         t._combine_paths('/home/sarah', '/etc'))
158
 
 
159
140
    def test_local_abspath_non_local_transport(self):
160
141
        # the base implementation should throw
161
142
        t = memory.MemoryTransport()
181
162
    def test_coalesce_unrelated(self):
182
163
        self.check([(0, 10, [(0, 10)]),
183
164
                    (20, 10, [(0, 10)]),
184
 
                   ], [(0, 10), (20, 10)])
 
165
                    ], [(0, 10), (20, 10)])
185
166
 
186
167
    def test_coalesce_unsorted(self):
187
168
        self.check([(20, 10, [(0, 10)]),
188
169
                    (0, 10, [(0, 10)]),
189
 
                   ], [(20, 10), (0, 10)])
 
170
                    ], [(20, 10), (0, 10)])
190
171
 
191
172
    def test_coalesce_nearby(self):
192
173
        self.check([(0, 20, [(0, 10), (10, 10)])],
194
175
 
195
176
    def test_coalesce_overlapped(self):
196
177
        self.assertRaises(ValueError,
197
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
198
 
                        [(0, 10), (5, 10)])
 
178
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
 
179
                          [(0, 10), (5, 10)])
199
180
 
200
181
    def test_coalesce_limit(self):
201
182
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
202
183
                              (30, 10), (40, 10)]),
203
184
                    (60, 50, [(0, 10), (10, 10), (20, 10),
204
185
                              (30, 10), (40, 10)]),
205
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
206
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
207
 
                       (90, 10), (100, 10)],
208
 
                    limit=5)
 
186
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
187
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
188
                        (90, 10), (100, 10)],
 
189
                   limit=5)
209
190
 
210
191
    def test_coalesce_no_limit(self):
211
192
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
212
193
                               (30, 10), (40, 10), (50, 10),
213
194
                               (60, 10), (70, 10), (80, 10),
214
195
                               (90, 10)]),
215
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
216
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
217
 
                       (90, 10), (100, 10)])
 
196
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
197
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
198
                        (90, 10), (100, 10)])
218
199
 
219
200
    def test_coalesce_fudge(self):
220
201
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
222
 
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
202
                    (100, 10, [(0, 10)]),
 
203
                    ], [(10, 10), (30, 10), (100, 10)],
 
204
                   fudge=10)
 
205
 
225
206
    def test_coalesce_max_size(self):
226
207
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
208
                    (30, 50, [(0, 50)]),
228
209
                    # If one range is above max_size, it gets its own coalesced
229
210
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
211
                    (100, 80, [(0, 80)]), ],
231
212
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
213
                   max_size=50)
234
214
 
235
215
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
216
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
217
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
 
                  )
 
218
                   )
239
219
 
240
220
    def test_coalesce_default_limit(self):
241
221
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
244
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
248
 
                   max_size=1*1024*1024*1024)
 
222
        ten_mb = 10 * 1024 * 1024
 
223
        self.check(
 
224
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
225
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
 
226
            [(i * ten_mb, ten_mb) for i in range(11)])
 
227
        self.check(
 
228
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
            [(i * ten_mb, ten_mb) for i in range(11)],
 
230
            max_size=1 * 1024 * 1024 * 1024)
249
231
 
250
232
 
251
233
class TestMemoryServer(tests.TestCase):
255
237
        server.start_server()
256
238
        url = server.get_url()
257
239
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
240
        t = transport.get_transport_from_url(url)
259
241
        del t
260
242
        server.stop_server()
261
243
        self.assertFalse(url in transport.transport_list_registry)
288
270
 
289
271
    def test_append_and_get(self):
290
272
        t = memory.MemoryTransport()
291
 
        t.append_bytes('path', 'content')
292
 
        self.assertEqual(t.get('path').read(), 'content')
293
 
        t.append_file('path', StringIO('content'))
294
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
273
        t.append_bytes('path', b'content')
 
274
        self.assertEqual(t.get('path').read(), b'content')
 
275
        t.append_file('path', BytesIO(b'content'))
 
276
        with t.get('path') as f:
 
277
            self.assertEqual(f.read(), b'contentcontent')
295
278
 
296
279
    def test_put_and_get(self):
297
280
        t = memory.MemoryTransport()
298
 
        t.put_file('path', StringIO('content'))
299
 
        self.assertEqual(t.get('path').read(), 'content')
300
 
        t.put_bytes('path', 'content')
301
 
        self.assertEqual(t.get('path').read(), 'content')
 
281
        t.put_file('path', BytesIO(b'content'))
 
282
        self.assertEqual(t.get('path').read(), b'content')
 
283
        t.put_bytes('path', b'content')
 
284
        self.assertEqual(t.get('path').read(), b'content')
302
285
 
303
286
    def test_append_without_dir_fails(self):
304
287
        t = memory.MemoryTransport()
305
288
        self.assertRaises(errors.NoSuchFile,
306
 
                          t.append_bytes, 'dir/path', 'content')
 
289
                          t.append_bytes, 'dir/path', b'content')
307
290
 
308
291
    def test_put_without_dir_fails(self):
309
292
        t = memory.MemoryTransport()
310
293
        self.assertRaises(errors.NoSuchFile,
311
 
                          t.put_file, 'dir/path', StringIO('content'))
 
294
                          t.put_file, 'dir/path', BytesIO(b'content'))
312
295
 
313
296
    def test_get_missing(self):
314
297
        transport = memory.MemoryTransport()
316
299
 
317
300
    def test_has_missing(self):
318
301
        t = memory.MemoryTransport()
319
 
        self.assertEquals(False, t.has('foo'))
 
302
        self.assertEqual(False, t.has('foo'))
320
303
 
321
304
    def test_has_present(self):
322
305
        t = memory.MemoryTransport()
323
 
        t.append_bytes('foo', 'content')
324
 
        self.assertEquals(True, t.has('foo'))
 
306
        t.append_bytes('foo', b'content')
 
307
        self.assertEqual(True, t.has('foo'))
325
308
 
326
309
    def test_list_dir(self):
327
310
        t = memory.MemoryTransport()
328
 
        t.put_bytes('foo', 'content')
 
311
        t.put_bytes('foo', b'content')
329
312
        t.mkdir('dir')
330
 
        t.put_bytes('dir/subfoo', 'content')
331
 
        t.put_bytes('dirlike', 'content')
 
313
        t.put_bytes('dir/subfoo', b'content')
 
314
        t.put_bytes('dirlike', b'content')
332
315
 
333
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
316
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
317
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
335
318
 
336
319
    def test_mkdir(self):
337
320
        t = memory.MemoryTransport()
338
321
        t.mkdir('dir')
339
 
        t.append_bytes('dir/path', 'content')
340
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
322
        t.append_bytes('dir/path', b'content')
 
323
        with t.get('dir/path') as f:
 
324
            self.assertEqual(f.read(), b'content')
341
325
 
342
326
    def test_mkdir_missing_parent(self):
343
327
        t = memory.MemoryTransport()
356
340
    def test_iter_files_recursive(self):
357
341
        t = memory.MemoryTransport()
358
342
        t.mkdir('dir')
359
 
        t.put_bytes('dir/foo', 'content')
360
 
        t.put_bytes('dir/bar', 'content')
361
 
        t.put_bytes('bar', 'content')
 
343
        t.put_bytes('dir/foo', b'content')
 
344
        t.put_bytes('dir/bar', b'content')
 
345
        t.put_bytes('bar', b'content')
362
346
        paths = set(t.iter_files_recursive())
363
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
347
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
364
348
 
365
349
    def test_stat(self):
366
350
        t = memory.MemoryTransport()
367
 
        t.put_bytes('foo', 'content')
368
 
        t.put_bytes('bar', 'phowar')
 
351
        t.put_bytes('foo', b'content')
 
352
        t.put_bytes('bar', b'phowar')
369
353
        self.assertEqual(7, t.stat('foo').st_size)
370
354
        self.assertEqual(6, t.stat('bar').st_size)
371
355
 
376
360
    def test_abspath(self):
377
361
        # The abspath is always relative to the chroot_url.
378
362
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
363
            transport.get_transport_from_url('memory:///foo/bar/'))
380
364
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
365
        t = transport.get_transport_from_url(server.get_url())
382
366
        self.assertEqual(server.get_url(), t.abspath('/'))
383
367
 
384
368
        subdir_t = t.clone('subdir')
386
370
 
387
371
    def test_clone(self):
388
372
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
373
            transport.get_transport_from_url('memory:///foo/bar/'))
390
374
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
375
        t = transport.get_transport_from_url(server.get_url())
392
376
        # relpath from root and root path are the same
393
377
        relpath_cloned = t.clone('foo')
394
378
        abspath_cloned = t.clone('/foo')
403
387
        This is so that it is not possible to escape a chroot by doing::
404
388
            url = chroot_transport.base
405
389
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
390
            new_t = transport.get_transport_from_url(parent_url)
407
391
        """
408
392
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
393
            transport.get_transport_from_url('memory:///path/subpath'))
410
394
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
395
        t = transport.get_transport_from_url(server.get_url())
 
396
        new_t = transport.get_transport_from_url(t.base)
413
397
        self.assertEqual(t.server, new_t.server)
414
398
        self.assertEqual(t.base, new_t.base)
415
399
 
420
404
        This is so that it is not possible to escape a chroot by doing::
421
405
            url = chroot_transport.base
422
406
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
407
            new_t = transport.get_transport_from_url(parent_url)
424
408
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
409
        server = chroot.ChrootServer(
 
410
            transport.get_transport_from_url('memory:///path/'))
426
411
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
412
        t = transport.get_transport_from_url(server.get_url())
428
413
        self.assertRaises(
429
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
414
            urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
430
415
 
431
416
 
432
417
class TestChrootServer(tests.TestCase):
440
425
        backing_transport = memory.MemoryTransport()
441
426
        server = chroot.ChrootServer(backing_transport)
442
427
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
428
        self.addCleanup(server.stop_server)
 
429
        self.assertTrue(server.scheme
 
430
                        in transport._get_protocol_handlers().keys())
448
431
 
449
432
    def test_stop_server(self):
450
433
        backing_transport = memory.MemoryTransport()
458
441
        backing_transport = memory.MemoryTransport()
459
442
        server = chroot.ChrootServer(backing_transport)
460
443
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
444
        self.addCleanup(server.stop_server)
 
445
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
446
 
 
447
 
 
448
class TestHooks(tests.TestCase):
 
449
    """Basic tests for transport hooks"""
 
450
 
 
451
    def _get_connected_transport(self):
 
452
        return transport.ConnectedTransport("bogus:nowhere")
 
453
 
 
454
    def test_transporthooks_initialisation(self):
 
455
        """Check all expected transport hook points are set up"""
 
456
        hookpoint = transport.TransportHooks()
 
457
        self.assertTrue("post_connect" in hookpoint,
 
458
                        "post_connect not in %s" % (hookpoint,))
 
459
 
 
460
    def test_post_connect(self):
 
461
        """Ensure the post_connect hook is called when _set_transport is"""
 
462
        calls = []
 
463
        transport.Transport.hooks.install_named_hook("post_connect",
 
464
                                                     calls.append, None)
 
465
        t = self._get_connected_transport()
 
466
        self.assertLength(0, calls)
 
467
        t._set_connection("connection", "auth")
 
468
        self.assertEqual(calls, [t])
465
469
 
466
470
 
467
471
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
474
    def test_abspath(self):
471
475
        # The abspath is always relative to the base of the backing transport.
472
476
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
477
            transport.get_transport_from_url('memory:///foo/bar/'),
474
478
            lambda x: x)
475
479
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
480
        t = transport.get_transport_from_url(server.get_url())
477
481
        self.assertEqual(server.get_url(), t.abspath('/'))
478
482
 
479
483
        subdir_t = t.clone('subdir')
482
486
 
483
487
    def make_pf_transport(self, filter_func=None):
484
488
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
489
 
486
490
        :param filter_func: by default this will be a no-op function.  Use this
487
491
            parameter to override it."""
488
492
        if filter_func is None:
489
 
            filter_func = lambda x: x
 
493
            def filter_func(x):
 
494
                return x
490
495
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
496
            transport.get_transport_from_url('memory:///foo/bar/'),
 
497
            filter_func)
492
498
        server.start_server()
493
499
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
500
        return transport.get_transport_from_url(server.get_url())
495
501
 
496
502
    def test__filter(self):
497
503
        # _filter (with an identity func as filter_func) always returns
510
516
 
511
517
    def test_filter_invocation(self):
512
518
        filter_log = []
 
519
 
513
520
        def filter(path):
514
521
            filter_log.append(path)
515
522
            return path
540
547
        otherwise) the filtering by doing::
541
548
            url = filtered_transport.base
542
549
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
550
            new_t = transport.get_transport_from_url(parent_url)
544
551
        """
545
552
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
553
        new_t = transport.get_transport_from_url(t.base)
547
554
        self.assertEqual(t.server, new_t.server)
548
555
        self.assertEqual(t.base, new_t.base)
549
556
 
558
565
        self.assertEqual(True, t.is_readonly())
559
566
 
560
567
    def test_http_parameters(self):
561
 
        from bzrlib.tests.http_server import HttpServer
 
568
        from breezy.tests.http_server import HttpServer
562
569
        # connect to '.' via http which is not listable
563
570
        server = HttpServer()
564
571
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
572
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
573
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
574
        self.assertEqual(False, t.listable())
568
575
        self.assertEqual(True, t.is_readonly())
569
576
 
585
592
    def test_http_parameters(self):
586
593
        # the listable and is_readonly parameters
587
594
        # are not changed by the fakenfs decorator
588
 
        from bzrlib.tests.http_server import HttpServer
 
595
        from breezy.tests.http_server import HttpServer
589
596
        # connect to '.' via http which is not listable
590
597
        server = HttpServer()
591
598
        self.start_server(server)
601
608
        # the url should be decorated appropriately
602
609
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
610
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
611
        t = transport.get_transport_from_url(server.get_url())
605
612
        # which must be a FakeNFSTransportDecorator instance.
606
613
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
614
 
619
626
 
620
627
    def get_vfat_transport(self, url):
621
628
        """Return vfat-backed transport for test directory"""
622
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
629
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
623
630
        return FakeVFATTransportDecorator('vfat+' + url)
624
631
 
625
632
    def test_transport_creation(self):
626
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
633
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
627
634
        t = self.get_vfat_transport('.')
628
635
        self.assertIsInstance(t, FakeVFATTransportDecorator)
629
636
 
654
661
 
655
662
    To verify a transport we need a server factory, which is a callable
656
663
    that accepts no parameters and returns an implementation of
657
 
    bzrlib.transport.Server.
 
664
    breezy.transport.Server.
658
665
 
659
666
    That Server is then used to construct transport instances and test
660
667
    the transport via loopback activity.
684
691
        base_url = self._server.get_url()
685
692
        url = self._adjust_url(base_url, relpath)
686
693
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
694
        t = transport.get_transport_from_url(url)
688
695
        # vila--20070607 if the following are commented out the test suite
689
696
        # still pass. Is this really still needed or was it a forgotten
690
697
        # temporary fix ?
695
702
        return t
696
703
 
697
704
 
 
705
class TestTransportFromPath(tests.TestCaseInTempDir):
 
706
 
 
707
    def test_with_path(self):
 
708
        t = transport.get_transport_from_path(self.test_dir)
 
709
        self.assertIsInstance(t, local.LocalTransport)
 
710
        self.assertEqual(t.base.rstrip("/"),
 
711
                         urlutils.local_path_to_url(self.test_dir))
 
712
 
 
713
    def test_with_url(self):
 
714
        t = transport.get_transport_from_path("file:")
 
715
        self.assertIsInstance(t, local.LocalTransport)
 
716
        self.assertEqual(
 
717
            t.base.rstrip("/"),
 
718
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
719
 
 
720
 
 
721
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
722
 
 
723
    def test_with_path(self):
 
724
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
 
725
                          self.test_dir)
 
726
 
 
727
    def test_with_url(self):
 
728
        url = urlutils.local_path_to_url(self.test_dir)
 
729
        t = transport.get_transport_from_url(url)
 
730
        self.assertIsInstance(t, local.LocalTransport)
 
731
        self.assertEqual(t.base.rstrip("/"), url)
 
732
 
 
733
    def test_with_url_and_segment_parameters(self):
 
734
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
 
735
        t = transport.get_transport_from_url(url)
 
736
        self.assertIsInstance(t, local.LocalTransport)
 
737
        self.assertEqual(t.base.rstrip("/"), url)
 
738
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
739
            f.write("data")
 
740
        self.assertTrue(t.has("afile"))
 
741
 
 
742
 
698
743
class TestLocalTransports(tests.TestCase):
699
744
 
700
745
    def test_get_transport_from_abspath(self):
701
746
        here = osutils.abspath('.')
702
747
        t = transport.get_transport(here)
703
748
        self.assertIsInstance(t, local.LocalTransport)
704
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
749
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
705
750
 
706
751
    def test_get_transport_from_relpath(self):
707
 
        here = osutils.abspath('.')
708
752
        t = transport.get_transport('.')
709
753
        self.assertIsInstance(t, local.LocalTransport)
710
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
754
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
711
755
 
712
756
    def test_get_transport_from_local_url(self):
713
757
        here = osutils.abspath('.')
714
758
        here_url = urlutils.local_path_to_url(here) + '/'
715
759
        t = transport.get_transport(here_url)
716
760
        self.assertIsInstance(t, local.LocalTransport)
717
 
        self.assertEquals(t.base, here_url)
 
761
        self.assertEqual(t.base, here_url)
718
762
 
719
763
    def test_local_abspath(self):
720
764
        here = osutils.abspath('.')
721
765
        t = transport.get_transport(here)
722
 
        self.assertEquals(t.local_abspath(''), here)
 
766
        self.assertEqual(t.local_abspath(''), here)
 
767
 
 
768
 
 
769
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
770
 
 
771
    def test_local_transport_mkdir(self):
 
772
        here = osutils.abspath('.')
 
773
        t = transport.get_transport(here)
 
774
        t.mkdir('test')
 
775
        self.assertTrue(os.path.exists('test'))
 
776
 
 
777
    def test_local_transport_mkdir_permission_denied(self):
 
778
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
779
        here = osutils.abspath('.')
 
780
        t = transport.get_transport(here)
 
781
 
 
782
        def fake_chmod(path, mode):
 
783
            e = OSError('permission denied')
 
784
            e.errno = errno.EPERM
 
785
            raise e
 
786
        self.overrideAttr(os, 'chmod', fake_chmod)
 
787
        t.mkdir('test')
 
788
        t.mkdir('test2', mode=0o707)
 
789
        self.assertTrue(os.path.exists('test'))
 
790
        self.assertTrue(os.path.exists('test2'))
 
791
 
 
792
 
 
793
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
794
 
 
795
    def test_local_fdatasync_calls_fdatasync(self):
 
796
        """Check fdatasync on a stream tries to flush the data to the OS.
 
797
 
 
798
        We can't easily observe the external effect but we can at least see
 
799
        it's called.
 
800
        """
 
801
        sentinel = object()
 
802
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
803
        if fdatasync is sentinel:
 
804
            raise tests.TestNotApplicable('fdatasync not supported')
 
805
        t = self.get_transport('.')
 
806
        calls = self.recordCalls(os, 'fdatasync')
 
807
        w = t.open_write_stream('out')
 
808
        w.write(b'foo')
 
809
        w.fdatasync()
 
810
        with open('out', 'rb') as f:
 
811
            # Should have been flushed.
 
812
            self.assertEqual(f.read(), b'foo')
 
813
        self.assertEqual(len(calls), 1, calls)
 
814
 
 
815
    def test_missing_directory(self):
 
816
        t = self.get_transport('.')
 
817
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
723
818
 
724
819
 
725
820
class TestWin32LocalTransport(tests.TestCase):
726
821
 
727
822
    def test_unc_clone_to_root(self):
 
823
        self.requireFeature(features.win32_feature)
728
824
        # Win32 UNC path like \\HOST\path
729
825
        # clone to root should stop at least at \\HOST part
730
826
        # not on \\
731
827
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
732
 
        for i in xrange(4):
 
828
        for i in range(4):
733
829
            t = t.clone('..')
734
 
        self.assertEquals(t.base, 'file://HOST/')
 
830
        self.assertEqual(t.base, 'file://HOST/')
735
831
        # make sure we reach the root
736
832
        t = t.clone('..')
737
 
        self.assertEquals(t.base, 'file://HOST/')
 
833
        self.assertEqual(t.base, 'file://HOST/')
738
834
 
739
835
 
740
836
class TestConnectedTransport(tests.TestCase):
743
839
    def test_parse_url(self):
744
840
        t = transport.ConnectedTransport(
745
841
            'http://simple.example.com/home/source')
746
 
        self.assertEquals(t._host, 'simple.example.com')
747
 
        self.assertEquals(t._port, None)
748
 
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
842
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
 
843
        self.assertEqual(t._parsed_url.port, None)
 
844
        self.assertEqual(t._parsed_url.path, '/home/source/')
 
845
        self.assertTrue(t._parsed_url.user is None)
 
846
        self.assertTrue(t._parsed_url.password is None)
751
847
 
752
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
848
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
753
849
 
754
850
    def test_parse_url_with_at_in_user(self):
755
851
        # Bug 228058
756
852
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
757
 
        self.assertEquals(t._user, 'user@host.com')
 
853
        self.assertEqual(t._parsed_url.user, 'user@host.com')
758
854
 
759
855
    def test_parse_quoted_url(self):
760
856
        t = transport.ConnectedTransport(
761
857
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
 
        self.assertEquals(t._host, 'exAmple.com')
763
 
        self.assertEquals(t._port, 2222)
764
 
        self.assertEquals(t._user, 'robey')
765
 
        self.assertEquals(t._password, 'h@t')
766
 
        self.assertEquals(t._path, '/path/')
 
858
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
 
859
        self.assertEqual(t._parsed_url.port, 2222)
 
860
        self.assertEqual(t._parsed_url.user, 'robey')
 
861
        self.assertEqual(t._parsed_url.password, 'h@t')
 
862
        self.assertEqual(t._parsed_url.path, '/path/')
767
863
 
768
864
        # Base should not keep track of the password
769
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
865
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
770
866
 
771
867
    def test_parse_invalid_url(self):
772
 
        self.assertRaises(errors.InvalidURL,
 
868
        self.assertRaises(urlutils.InvalidURL,
773
869
                          transport.ConnectedTransport,
774
870
                          'sftp://lily.org:~janneke/public/bzr/gub')
775
871
 
776
872
    def test_relpath(self):
777
873
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
874
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
875
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
 
876
                         'sub')
780
877
        self.assertRaises(errors.PathNotChild, t.relpath,
781
878
                          'http://user@host.com/abs/path/sub')
782
879
        self.assertRaises(errors.PathNotChild, t.relpath,
787
884
                          'sftp://user@host.com:33/abs/path/sub')
788
885
        # Make sure it works when we don't supply a username
789
886
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
790
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
887
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
791
888
 
792
889
        # Make sure it works when parts of the path will be url encoded
793
890
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
794
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
891
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
795
892
 
796
893
    def test_connection_sharing_propagate_credentials(self):
797
894
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
798
 
        self.assertEquals('user', t._user)
799
 
        self.assertEquals('host.com', t._host)
 
895
        self.assertEqual('user', t._parsed_url.user)
 
896
        self.assertEqual('host.com', t._parsed_url.host)
800
897
        self.assertIs(None, t._get_connection())
801
 
        self.assertIs(None, t._password)
 
898
        self.assertIs(None, t._parsed_url.password)
802
899
        c = t.clone('subdir')
803
900
        self.assertIs(None, c._get_connection())
804
 
        self.assertIs(None, t._password)
 
901
        self.assertIs(None, t._parsed_url.password)
805
902
 
806
903
        # Simulate the user entering a password
807
904
        password = 'secret'
826
923
 
827
924
    def test_reuse_same_transport(self):
828
925
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
830
 
                                     possible_transports=possible_transports)
 
926
        t1 = transport.get_transport_from_url(
 
927
            'http://foo/', possible_transports=possible_transports)
831
928
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
833
 
                                     possible_transports=[t1])
 
929
        t2 = transport.get_transport_from_url('http://foo/',
 
930
                                              possible_transports=[t1])
834
931
        self.assertIs(t1, t2)
835
932
 
836
933
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
839
 
                                     possible_transports=[t3])
 
934
        t3 = transport.get_transport_from_url('http://foo/path/')
 
935
        t4 = transport.get_transport_from_url('http://foo/path',
 
936
                                              possible_transports=[t3])
840
937
        self.assertIs(t3, t4)
841
938
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
844
 
                                     possible_transports=[t5])
 
939
        t5 = transport.get_transport_from_url('http://foo/path')
 
940
        t6 = transport.get_transport_from_url('http://foo/path/',
 
941
                                              possible_transports=[t5])
845
942
        self.assertIs(t5, t6)
846
943
 
847
944
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
850
 
                                     possible_transports=[t1])
 
945
        t1 = transport.get_transport_from_url('http://foo/path')
 
946
        t2 = transport.get_transport_from_url('http://bar/path',
 
947
                                              possible_transports=[t1])
851
948
        self.assertIsNot(t1, t2)
852
949
 
853
950
 
854
951
class TestTransportTrace(tests.TestCase):
855
952
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
953
    def test_decorator(self):
 
954
        t = transport.get_transport_from_url('trace+memory://')
 
955
        self.assertIsInstance(
 
956
            t, breezy.transport.trace.TransportTraceDecorator)
859
957
 
860
958
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
959
        t = transport.get_transport_from_url('trace+memory://')
862
960
        t2 = t.clone('.')
863
961
        self.assertTrue(t is not t2)
864
962
        self.assertTrue(t._activity is t2._activity)
868
966
    # still won't cause a test failure when the top level Transport API
869
967
    # changes; so there is little return doing that.
870
968
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
872
 
        t.put_bytes('foo', 'barish')
 
969
        t = transport.get_transport_from_url('trace+memory:///')
 
970
        t.put_bytes('foo', b'barish')
873
971
        t.get('foo')
874
972
        expected_result = []
875
973
        # put_bytes records the bytes, not the content to avoid memory
880
978
        self.assertEqual(expected_result, t._activity)
881
979
 
882
980
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
884
 
        t.put_bytes('foo', 'barish')
 
981
        t = transport.get_transport_from_url('trace+memory:///')
 
982
        t.put_bytes('foo', b'barish')
885
983
        list(t.readv('foo', [(0, 1), (3, 2)],
886
984
                     adjust_for_latency=True, upper_limit=6))
887
985
        expected_result = []
896
994
class TestSSHConnections(tests.TestCaseWithTransport):
897
995
 
898
996
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
997
        """get_transport of a bzr+ssh:// behaves correctly.
900
998
 
901
999
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
1000
        """
909
1007
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
910
1008
        # override the interface (doesn't change self._vendor).
911
1009
        # Note that this does encryption, so can be slow.
912
 
        from bzrlib.tests import stub_sftp
 
1010
        from breezy.tests import stub_sftp
913
1011
 
914
1012
        # Start an SSH server
915
1013
        self.command_executed = []
918
1016
        # SSH channel ourselves.  Surely this has already been implemented
919
1017
        # elsewhere?
920
1018
        started = []
 
1019
 
921
1020
        class StubSSHServer(stub_sftp.StubServer):
922
1021
 
923
1022
            test = self
926
1025
                self.test.command_executed.append(command)
927
1026
                proc = subprocess.Popen(
928
1027
                    command, shell=True, stdin=subprocess.PIPE,
929
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
1028
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
 
1029
                    bufsize=0)
930
1030
 
931
1031
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
1032
                # Start a thread for each of stdin/out/err, and relay bytes
 
1033
                # from the subprocess to channel and vice versa.
934
1034
                def ferry_bytes(read, write, close):
935
1035
                    while True:
936
1036
                        bytes = read(1)
937
 
                        if bytes == '':
 
1037
                        if bytes == b'':
938
1038
                            close()
939
1039
                            break
940
1040
                        write(bytes)
955
1055
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
1056
        # We *don't* want to override the default SSH vendor: the detected one
957
1057
        # is the one to use.
 
1058
 
 
1059
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1060
        # inherits from SFTPServer which forces the SSH vendor to
 
1061
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
1062
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
1063
        port = ssh_server.port
960
1064
 
961
1065
        if sys.platform == 'win32':
962
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1066
            bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
963
1067
        else:
964
 
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
1068
            bzr_remote_path = self.get_brz_path()
 
1069
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
1070
 
967
1071
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
1072
        # variable is used to tell bzr what command to run on the remote end.
977
1081
        t.mkdir('foo')
978
1082
 
979
1083
        self.assertEqual(
980
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1084
            [b'%s serve --inet --directory=/ --allow-writes' %
 
1085
                bzr_remote_path.encode()],
981
1086
            self.command_executed)
982
1087
        # Make sure to disconnect, so that the remote process can stop, and we
983
1088
        # can cleanup. Then pause the test until everything is shutdown
989
1094
        # And the rest are threads
990
1095
        for t in started[1:]:
991
1096
            t.join()
 
1097
 
 
1098
 
 
1099
class TestUnhtml(tests.TestCase):
 
1100
 
 
1101
    """Tests for unhtml_roughly"""
 
1102
 
 
1103
    def test_truncation(self):
 
1104
        fake_html = "<p>something!\n" * 1000
 
1105
        result = http.unhtml_roughly(fake_html)
 
1106
        self.assertEqual(len(result), 1000)
 
1107
        self.assertStartsWith(result, " something!")