/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

(jelmer) Convert bzrlib.smtp_connection to use config stacks. (Jelmer
 Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
import errno
 
20
import os
 
21
import subprocess
 
22
import sys
 
23
import threading
 
24
 
 
25
from bzrlib import (
 
26
    errors,
 
27
    osutils,
 
28
    tests,
 
29
    transport,
 
30
    urlutils,
 
31
    )
 
32
from bzrlib.directory_service import directories
 
33
from bzrlib.transport import (
 
34
    chroot,
 
35
    fakenfs,
 
36
    http,
 
37
    local,
 
38
    location_to_url,
 
39
    memory,
 
40
    pathfilter,
 
41
    readonly,
 
42
    )
 
43
import bzrlib.transport.trace
 
44
from bzrlib.tests import (
 
45
    features,
 
46
    test_server,
 
47
    )
 
48
 
 
49
 
 
50
# TODO: Should possibly split transport-specific tests into their own files.
 
51
 
 
52
 
 
53
class TestTransport(tests.TestCase):
 
54
    """Test the non transport-concrete class functionality."""
 
55
 
 
56
    def test__get_set_protocol_handlers(self):
 
57
        handlers = transport._get_protocol_handlers()
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
62
 
 
63
    def test_get_transport_modules(self):
 
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
66
        # don't pollute the current handlers
 
67
        transport._clear_protocol_handlers()
 
68
 
 
69
        class SampleHandler(object):
 
70
            """I exist, isnt that enough?"""
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                            'bzrlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                            'bzrlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                            'bzrlib.transport.chroot',
 
82
                            'bzrlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
 
84
 
 
85
    def test_transport_dependency(self):
 
86
        """Transport with missing dependency causes no error"""
 
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
89
        # don't pollute the current handlers
 
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
94
        try:
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
 
98
            self.assertEquals('Unsupported protocol'
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
 
102
        else:
 
103
            self.fail('Did not raise UnsupportedProtocol')
 
104
 
 
105
    def test_transport_fallback(self):
 
106
        """Transport with missing dependency causes no error"""
 
107
        saved_handlers = transport._get_protocol_handlers()
 
108
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
109
        transport._clear_protocol_handlers()
 
110
        transport.register_transport_proto('foo')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
117
 
 
118
    def test_ssh_hints(self):
 
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
120
        try:
 
121
            transport.get_transport_from_url('ssh://fooserver/foo')
 
122
        except errors.UnsupportedProtocol, e:
 
123
            e_str = str(e)
 
124
            self.assertEquals('Unsupported protocol'
 
125
                              ' for url "ssh://fooserver/foo":'
 
126
                              ' bzr supports bzr+ssh to operate over ssh,'
 
127
                              ' use "bzr+ssh://fooserver/foo".',
 
128
                              str(e))
 
129
        else:
 
130
            self.fail('Did not raise UnsupportedProtocol')
 
131
 
 
132
    def test_LateReadError(self):
 
133
        """The LateReadError helper should raise on read()."""
 
134
        a_file = transport.LateReadError('a path')
 
135
        try:
 
136
            a_file.read()
 
137
        except errors.ReadError, error:
 
138
            self.assertEqual('a path', error.path)
 
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
140
        a_file.close()
 
141
 
 
142
    def test_local_abspath_non_local_transport(self):
 
143
        # the base implementation should throw
 
144
        t = memory.MemoryTransport()
 
145
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
146
        self.assertEqual('memory:///t is not a local path.', str(e))
 
147
 
 
148
 
 
149
class TestCoalesceOffsets(tests.TestCase):
 
150
 
 
151
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
152
        coalesce = transport.Transport._coalesce_offsets
 
153
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
154
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
155
                            max_size=max_size))
 
156
        self.assertEqual(exp, out)
 
157
 
 
158
    def test_coalesce_empty(self):
 
159
        self.check([], [])
 
160
 
 
161
    def test_coalesce_simple(self):
 
162
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
163
 
 
164
    def test_coalesce_unrelated(self):
 
165
        self.check([(0, 10, [(0, 10)]),
 
166
                    (20, 10, [(0, 10)]),
 
167
                   ], [(0, 10), (20, 10)])
 
168
 
 
169
    def test_coalesce_unsorted(self):
 
170
        self.check([(20, 10, [(0, 10)]),
 
171
                    (0, 10, [(0, 10)]),
 
172
                   ], [(20, 10), (0, 10)])
 
173
 
 
174
    def test_coalesce_nearby(self):
 
175
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
176
                   [(0, 10), (10, 10)])
 
177
 
 
178
    def test_coalesce_overlapped(self):
 
179
        self.assertRaises(ValueError,
 
180
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
181
                        [(0, 10), (5, 10)])
 
182
 
 
183
    def test_coalesce_limit(self):
 
184
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
185
                              (30, 10), (40, 10)]),
 
186
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
187
                              (30, 10), (40, 10)]),
 
188
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
189
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
190
                       (90, 10), (100, 10)],
 
191
                    limit=5)
 
192
 
 
193
    def test_coalesce_no_limit(self):
 
194
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
195
                               (30, 10), (40, 10), (50, 10),
 
196
                               (60, 10), (70, 10), (80, 10),
 
197
                               (90, 10)]),
 
198
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
199
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
200
                       (90, 10), (100, 10)])
 
201
 
 
202
    def test_coalesce_fudge(self):
 
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
204
                    (100, 10, [(0, 10)]),
 
205
                   ], [(10, 10), (30, 10), (100, 10)],
 
206
                   fudge=10)
 
207
 
 
208
    def test_coalesce_max_size(self):
 
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
210
                    (30, 50, [(0, 50)]),
 
211
                    # If one range is above max_size, it gets its own coalesced
 
212
                    # offset
 
213
                    (100, 80, [(0, 80)]),],
 
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
215
                   max_size=50)
 
216
 
 
217
    def test_coalesce_no_max_size(self):
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
220
                  )
 
221
 
 
222
    def test_coalesce_default_limit(self):
 
223
        # By default we use a 100MB max size.
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
230
                   max_size=1*1024*1024*1024)
 
231
 
 
232
 
 
233
class TestMemoryServer(tests.TestCase):
 
234
 
 
235
    def test_create_server(self):
 
236
        server = memory.MemoryServer()
 
237
        server.start_server()
 
238
        url = server.get_url()
 
239
        self.assertTrue(url in transport.transport_list_registry)
 
240
        t = transport.get_transport_from_url(url)
 
241
        del t
 
242
        server.stop_server()
 
243
        self.assertFalse(url in transport.transport_list_registry)
 
244
        self.assertRaises(errors.UnsupportedProtocol,
 
245
                          transport.get_transport, url)
 
246
 
 
247
 
 
248
class TestMemoryTransport(tests.TestCase):
 
249
 
 
250
    def test_get_transport(self):
 
251
        memory.MemoryTransport()
 
252
 
 
253
    def test_clone(self):
 
254
        t = memory.MemoryTransport()
 
255
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
256
        self.assertEqual("memory:///", t.clone("/").base)
 
257
 
 
258
    def test_abspath(self):
 
259
        t = memory.MemoryTransport()
 
260
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
261
 
 
262
    def test_abspath_of_root(self):
 
263
        t = memory.MemoryTransport()
 
264
        self.assertEqual("memory:///", t.base)
 
265
        self.assertEqual("memory:///", t.abspath('/'))
 
266
 
 
267
    def test_abspath_of_relpath_starting_at_root(self):
 
268
        t = memory.MemoryTransport()
 
269
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
270
 
 
271
    def test_append_and_get(self):
 
272
        t = memory.MemoryTransport()
 
273
        t.append_bytes('path', 'content')
 
274
        self.assertEqual(t.get('path').read(), 'content')
 
275
        t.append_file('path', StringIO('content'))
 
276
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
277
 
 
278
    def test_put_and_get(self):
 
279
        t = memory.MemoryTransport()
 
280
        t.put_file('path', StringIO('content'))
 
281
        self.assertEqual(t.get('path').read(), 'content')
 
282
        t.put_bytes('path', 'content')
 
283
        self.assertEqual(t.get('path').read(), 'content')
 
284
 
 
285
    def test_append_without_dir_fails(self):
 
286
        t = memory.MemoryTransport()
 
287
        self.assertRaises(errors.NoSuchFile,
 
288
                          t.append_bytes, 'dir/path', 'content')
 
289
 
 
290
    def test_put_without_dir_fails(self):
 
291
        t = memory.MemoryTransport()
 
292
        self.assertRaises(errors.NoSuchFile,
 
293
                          t.put_file, 'dir/path', StringIO('content'))
 
294
 
 
295
    def test_get_missing(self):
 
296
        transport = memory.MemoryTransport()
 
297
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
298
 
 
299
    def test_has_missing(self):
 
300
        t = memory.MemoryTransport()
 
301
        self.assertEquals(False, t.has('foo'))
 
302
 
 
303
    def test_has_present(self):
 
304
        t = memory.MemoryTransport()
 
305
        t.append_bytes('foo', 'content')
 
306
        self.assertEquals(True, t.has('foo'))
 
307
 
 
308
    def test_list_dir(self):
 
309
        t = memory.MemoryTransport()
 
310
        t.put_bytes('foo', 'content')
 
311
        t.mkdir('dir')
 
312
        t.put_bytes('dir/subfoo', 'content')
 
313
        t.put_bytes('dirlike', 'content')
 
314
 
 
315
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
316
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
317
 
 
318
    def test_mkdir(self):
 
319
        t = memory.MemoryTransport()
 
320
        t.mkdir('dir')
 
321
        t.append_bytes('dir/path', 'content')
 
322
        self.assertEqual(t.get('dir/path').read(), 'content')
 
323
 
 
324
    def test_mkdir_missing_parent(self):
 
325
        t = memory.MemoryTransport()
 
326
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
327
 
 
328
    def test_mkdir_twice(self):
 
329
        t = memory.MemoryTransport()
 
330
        t.mkdir('dir')
 
331
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
332
 
 
333
    def test_parameters(self):
 
334
        t = memory.MemoryTransport()
 
335
        self.assertEqual(True, t.listable())
 
336
        self.assertEqual(False, t.is_readonly())
 
337
 
 
338
    def test_iter_files_recursive(self):
 
339
        t = memory.MemoryTransport()
 
340
        t.mkdir('dir')
 
341
        t.put_bytes('dir/foo', 'content')
 
342
        t.put_bytes('dir/bar', 'content')
 
343
        t.put_bytes('bar', 'content')
 
344
        paths = set(t.iter_files_recursive())
 
345
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
346
 
 
347
    def test_stat(self):
 
348
        t = memory.MemoryTransport()
 
349
        t.put_bytes('foo', 'content')
 
350
        t.put_bytes('bar', 'phowar')
 
351
        self.assertEqual(7, t.stat('foo').st_size)
 
352
        self.assertEqual(6, t.stat('bar').st_size)
 
353
 
 
354
 
 
355
class ChrootDecoratorTransportTest(tests.TestCase):
 
356
    """Chroot decoration specific tests."""
 
357
 
 
358
    def test_abspath(self):
 
359
        # The abspath is always relative to the chroot_url.
 
360
        server = chroot.ChrootServer(
 
361
            transport.get_transport_from_url('memory:///foo/bar/'))
 
362
        self.start_server(server)
 
363
        t = transport.get_transport_from_url(server.get_url())
 
364
        self.assertEqual(server.get_url(), t.abspath('/'))
 
365
 
 
366
        subdir_t = t.clone('subdir')
 
367
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
368
 
 
369
    def test_clone(self):
 
370
        server = chroot.ChrootServer(
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
 
372
        self.start_server(server)
 
373
        t = transport.get_transport_from_url(server.get_url())
 
374
        # relpath from root and root path are the same
 
375
        relpath_cloned = t.clone('foo')
 
376
        abspath_cloned = t.clone('/foo')
 
377
        self.assertEqual(server, relpath_cloned.server)
 
378
        self.assertEqual(server, abspath_cloned.server)
 
379
 
 
380
    def test_chroot_url_preserves_chroot(self):
 
381
        """Calling get_transport on a chroot transport's base should produce a
 
382
        transport with exactly the same behaviour as the original chroot
 
383
        transport.
 
384
 
 
385
        This is so that it is not possible to escape a chroot by doing::
 
386
            url = chroot_transport.base
 
387
            parent_url = urlutils.join(url, '..')
 
388
            new_t = transport.get_transport_from_url(parent_url)
 
389
        """
 
390
        server = chroot.ChrootServer(
 
391
            transport.get_transport_from_url('memory:///path/subpath'))
 
392
        self.start_server(server)
 
393
        t = transport.get_transport_from_url(server.get_url())
 
394
        new_t = transport.get_transport_from_url(t.base)
 
395
        self.assertEqual(t.server, new_t.server)
 
396
        self.assertEqual(t.base, new_t.base)
 
397
 
 
398
    def test_urljoin_preserves_chroot(self):
 
399
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
400
        URL that escapes the intended chroot.
 
401
 
 
402
        This is so that it is not possible to escape a chroot by doing::
 
403
            url = chroot_transport.base
 
404
            parent_url = urlutils.join(url, '..')
 
405
            new_t = transport.get_transport_from_url(parent_url)
 
406
        """
 
407
        server = chroot.ChrootServer(
 
408
            transport.get_transport_from_url('memory:///path/'))
 
409
        self.start_server(server)
 
410
        t = transport.get_transport_from_url(server.get_url())
 
411
        self.assertRaises(
 
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
413
 
 
414
 
 
415
class TestChrootServer(tests.TestCase):
 
416
 
 
417
    def test_construct(self):
 
418
        backing_transport = memory.MemoryTransport()
 
419
        server = chroot.ChrootServer(backing_transport)
 
420
        self.assertEqual(backing_transport, server.backing_transport)
 
421
 
 
422
    def test_setUp(self):
 
423
        backing_transport = memory.MemoryTransport()
 
424
        server = chroot.ChrootServer(backing_transport)
 
425
        server.start_server()
 
426
        self.addCleanup(server.stop_server)
 
427
        self.assertTrue(server.scheme
 
428
                        in transport._get_protocol_handlers().keys())
 
429
 
 
430
    def test_stop_server(self):
 
431
        backing_transport = memory.MemoryTransport()
 
432
        server = chroot.ChrootServer(backing_transport)
 
433
        server.start_server()
 
434
        server.stop_server()
 
435
        self.assertFalse(server.scheme
 
436
                         in transport._get_protocol_handlers().keys())
 
437
 
 
438
    def test_get_url(self):
 
439
        backing_transport = memory.MemoryTransport()
 
440
        server = chroot.ChrootServer(backing_transport)
 
441
        server.start_server()
 
442
        self.addCleanup(server.stop_server)
 
443
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
444
 
 
445
 
 
446
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
447
    """Pathfilter decoration specific tests."""
 
448
 
 
449
    def test_abspath(self):
 
450
        # The abspath is always relative to the base of the backing transport.
 
451
        server = pathfilter.PathFilteringServer(
 
452
            transport.get_transport_from_url('memory:///foo/bar/'),
 
453
            lambda x: x)
 
454
        server.start_server()
 
455
        t = transport.get_transport_from_url(server.get_url())
 
456
        self.assertEqual(server.get_url(), t.abspath('/'))
 
457
 
 
458
        subdir_t = t.clone('subdir')
 
459
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
460
        server.stop_server()
 
461
 
 
462
    def make_pf_transport(self, filter_func=None):
 
463
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
464
 
 
465
        :param filter_func: by default this will be a no-op function.  Use this
 
466
            parameter to override it."""
 
467
        if filter_func is None:
 
468
            filter_func = lambda x: x
 
469
        server = pathfilter.PathFilteringServer(
 
470
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
471
        server.start_server()
 
472
        self.addCleanup(server.stop_server)
 
473
        return transport.get_transport_from_url(server.get_url())
 
474
 
 
475
    def test__filter(self):
 
476
        # _filter (with an identity func as filter_func) always returns
 
477
        # paths relative to the base of the backing transport.
 
478
        t = self.make_pf_transport()
 
479
        self.assertEqual('foo', t._filter('foo'))
 
480
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
481
        self.assertEqual('', t._filter('..'))
 
482
        self.assertEqual('', t._filter('/'))
 
483
        # The base of the pathfiltering transport is taken into account too.
 
484
        t = t.clone('subdir1/subdir2')
 
485
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
486
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
487
        self.assertEqual('subdir1', t._filter('..'))
 
488
        self.assertEqual('', t._filter('/'))
 
489
 
 
490
    def test_filter_invocation(self):
 
491
        filter_log = []
 
492
 
 
493
        def filter(path):
 
494
            filter_log.append(path)
 
495
            return path
 
496
        t = self.make_pf_transport(filter)
 
497
        t.has('abc')
 
498
        self.assertEqual(['abc'], filter_log)
 
499
        del filter_log[:]
 
500
        t.clone('abc').has('xyz')
 
501
        self.assertEqual(['abc/xyz'], filter_log)
 
502
        del filter_log[:]
 
503
        t.has('/abc')
 
504
        self.assertEqual(['abc'], filter_log)
 
505
 
 
506
    def test_clone(self):
 
507
        t = self.make_pf_transport()
 
508
        # relpath from root and root path are the same
 
509
        relpath_cloned = t.clone('foo')
 
510
        abspath_cloned = t.clone('/foo')
 
511
        self.assertEqual(t.server, relpath_cloned.server)
 
512
        self.assertEqual(t.server, abspath_cloned.server)
 
513
 
 
514
    def test_url_preserves_pathfiltering(self):
 
515
        """Calling get_transport on a pathfiltered transport's base should
 
516
        produce a transport with exactly the same behaviour as the original
 
517
        pathfiltered transport.
 
518
 
 
519
        This is so that it is not possible to escape (accidentally or
 
520
        otherwise) the filtering by doing::
 
521
            url = filtered_transport.base
 
522
            parent_url = urlutils.join(url, '..')
 
523
            new_t = transport.get_transport_from_url(parent_url)
 
524
        """
 
525
        t = self.make_pf_transport()
 
526
        new_t = transport.get_transport_from_url(t.base)
 
527
        self.assertEqual(t.server, new_t.server)
 
528
        self.assertEqual(t.base, new_t.base)
 
529
 
 
530
 
 
531
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
532
    """Readonly decoration specific tests."""
 
533
 
 
534
    def test_local_parameters(self):
 
535
        # connect to . in readonly mode
 
536
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
537
        self.assertEqual(True, t.listable())
 
538
        self.assertEqual(True, t.is_readonly())
 
539
 
 
540
    def test_http_parameters(self):
 
541
        from bzrlib.tests.http_server import HttpServer
 
542
        # connect to '.' via http which is not listable
 
543
        server = HttpServer()
 
544
        self.start_server(server)
 
545
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
546
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
547
        self.assertEqual(False, t.listable())
 
548
        self.assertEqual(True, t.is_readonly())
 
549
 
 
550
 
 
551
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
552
    """NFS decorator specific tests."""
 
553
 
 
554
    def get_nfs_transport(self, url):
 
555
        # connect to url with nfs decoration
 
556
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
557
 
 
558
    def test_local_parameters(self):
 
559
        # the listable and is_readonly parameters
 
560
        # are not changed by the fakenfs decorator
 
561
        t = self.get_nfs_transport('.')
 
562
        self.assertEqual(True, t.listable())
 
563
        self.assertEqual(False, t.is_readonly())
 
564
 
 
565
    def test_http_parameters(self):
 
566
        # the listable and is_readonly parameters
 
567
        # are not changed by the fakenfs decorator
 
568
        from bzrlib.tests.http_server import HttpServer
 
569
        # connect to '.' via http which is not listable
 
570
        server = HttpServer()
 
571
        self.start_server(server)
 
572
        t = self.get_nfs_transport(server.get_url())
 
573
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
574
        self.assertEqual(False, t.listable())
 
575
        self.assertEqual(True, t.is_readonly())
 
576
 
 
577
    def test_fakenfs_server_default(self):
 
578
        # a FakeNFSServer() should bring up a local relpath server for itself
 
579
        server = test_server.FakeNFSServer()
 
580
        self.start_server(server)
 
581
        # the url should be decorated appropriately
 
582
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
583
        # and we should be able to get a transport for it
 
584
        t = transport.get_transport_from_url(server.get_url())
 
585
        # which must be a FakeNFSTransportDecorator instance.
 
586
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
587
 
 
588
    def test_fakenfs_rename_semantics(self):
 
589
        # a FakeNFS transport must mangle the way rename errors occur to
 
590
        # look like NFS problems.
 
591
        t = self.get_nfs_transport('.')
 
592
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
593
                        transport=t)
 
594
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
595
 
 
596
 
 
597
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
598
    """Tests for simulation of VFAT restrictions"""
 
599
 
 
600
    def get_vfat_transport(self, url):
 
601
        """Return vfat-backed transport for test directory"""
 
602
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
603
        return FakeVFATTransportDecorator('vfat+' + url)
 
604
 
 
605
    def test_transport_creation(self):
 
606
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
607
        t = self.get_vfat_transport('.')
 
608
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
609
 
 
610
    def test_transport_mkdir(self):
 
611
        t = self.get_vfat_transport('.')
 
612
        t.mkdir('HELLO')
 
613
        self.assertTrue(t.has('hello'))
 
614
        self.assertTrue(t.has('Hello'))
 
615
 
 
616
    def test_forbidden_chars(self):
 
617
        t = self.get_vfat_transport('.')
 
618
        self.assertRaises(ValueError, t.has, "<NU>")
 
619
 
 
620
 
 
621
class BadTransportHandler(transport.Transport):
 
622
    def __init__(self, base_url):
 
623
        raise errors.DependencyNotPresent('some_lib',
 
624
                                          'testing missing dependency')
 
625
 
 
626
 
 
627
class BackupTransportHandler(transport.Transport):
 
628
    """Test transport that works as a backup for the BadTransportHandler"""
 
629
    pass
 
630
 
 
631
 
 
632
class TestTransportImplementation(tests.TestCaseInTempDir):
 
633
    """Implementation verification for transports.
 
634
 
 
635
    To verify a transport we need a server factory, which is a callable
 
636
    that accepts no parameters and returns an implementation of
 
637
    bzrlib.transport.Server.
 
638
 
 
639
    That Server is then used to construct transport instances and test
 
640
    the transport via loopback activity.
 
641
 
 
642
    Currently this assumes that the Transport object is connected to the
 
643
    current working directory.  So that whatever is done
 
644
    through the transport, should show up in the working
 
645
    directory, and vice-versa. This is a bug, because its possible to have
 
646
    URL schemes which provide access to something that may not be
 
647
    result in storage on the local disk, i.e. due to file system limits, or
 
648
    due to it being a database or some other non-filesystem tool.
 
649
 
 
650
    This also tests to make sure that the functions work with both
 
651
    generators and lists (assuming iter(list) is effectively a generator)
 
652
    """
 
653
 
 
654
    def setUp(self):
 
655
        super(TestTransportImplementation, self).setUp()
 
656
        self._server = self.transport_server()
 
657
        self.start_server(self._server)
 
658
 
 
659
    def get_transport(self, relpath=None):
 
660
        """Return a connected transport to the local directory.
 
661
 
 
662
        :param relpath: a path relative to the base url.
 
663
        """
 
664
        base_url = self._server.get_url()
 
665
        url = self._adjust_url(base_url, relpath)
 
666
        # try getting the transport via the regular interface:
 
667
        t = transport.get_transport_from_url(url)
 
668
        # vila--20070607 if the following are commented out the test suite
 
669
        # still pass. Is this really still needed or was it a forgotten
 
670
        # temporary fix ?
 
671
        if not isinstance(t, self.transport_class):
 
672
            # we did not get the correct transport class type. Override the
 
673
            # regular connection behaviour by direct construction.
 
674
            t = self.transport_class(url)
 
675
        return t
 
676
 
 
677
 
 
678
class TestTransportFromPath(tests.TestCaseInTempDir):
 
679
 
 
680
    def test_with_path(self):
 
681
        t = transport.get_transport_from_path(self.test_dir)
 
682
        self.assertIsInstance(t, local.LocalTransport)
 
683
        self.assertEquals(t.base.rstrip("/"),
 
684
            urlutils.local_path_to_url(self.test_dir))
 
685
 
 
686
    def test_with_url(self):
 
687
        t = transport.get_transport_from_path("file:")
 
688
        self.assertIsInstance(t, local.LocalTransport)
 
689
        self.assertEquals(t.base.rstrip("/"),
 
690
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
691
 
 
692
 
 
693
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
694
 
 
695
    def test_with_path(self):
 
696
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
697
            self.test_dir)
 
698
 
 
699
    def test_with_url(self):
 
700
        url = urlutils.local_path_to_url(self.test_dir)
 
701
        t = transport.get_transport_from_url(url)
 
702
        self.assertIsInstance(t, local.LocalTransport)
 
703
        self.assertEquals(t.base.rstrip("/"), url)
 
704
 
 
705
    def test_with_url_and_segment_parameters(self):
 
706
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
707
        t = transport.get_transport_from_url(url)
 
708
        self.assertIsInstance(t, local.LocalTransport)
 
709
        self.assertEquals(t.base.rstrip("/"), url)
 
710
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
711
            f.write("data")
 
712
        self.assertTrue(t.has("afile"))
 
713
 
 
714
 
 
715
class TestLocalTransports(tests.TestCase):
 
716
 
 
717
    def test_get_transport_from_abspath(self):
 
718
        here = osutils.abspath('.')
 
719
        t = transport.get_transport(here)
 
720
        self.assertIsInstance(t, local.LocalTransport)
 
721
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
722
 
 
723
    def test_get_transport_from_relpath(self):
 
724
        here = osutils.abspath('.')
 
725
        t = transport.get_transport('.')
 
726
        self.assertIsInstance(t, local.LocalTransport)
 
727
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
728
 
 
729
    def test_get_transport_from_local_url(self):
 
730
        here = osutils.abspath('.')
 
731
        here_url = urlutils.local_path_to_url(here) + '/'
 
732
        t = transport.get_transport(here_url)
 
733
        self.assertIsInstance(t, local.LocalTransport)
 
734
        self.assertEquals(t.base, here_url)
 
735
 
 
736
    def test_local_abspath(self):
 
737
        here = osutils.abspath('.')
 
738
        t = transport.get_transport(here)
 
739
        self.assertEquals(t.local_abspath(''), here)
 
740
 
 
741
 
 
742
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
743
 
 
744
    def test_local_transport_mkdir(self):
 
745
        here = osutils.abspath('.')
 
746
        t = transport.get_transport(here)
 
747
        t.mkdir('test')
 
748
        self.assertTrue(os.path.exists('test'))
 
749
 
 
750
    def test_local_transport_mkdir_permission_denied(self):
 
751
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
752
        here = osutils.abspath('.')
 
753
        t = transport.get_transport(here)
 
754
        def fake_chmod(path, mode):
 
755
            e = OSError('permission denied')
 
756
            e.errno = errno.EPERM
 
757
            raise e
 
758
        self.overrideAttr(os, 'chmod', fake_chmod)
 
759
        t.mkdir('test')
 
760
        t.mkdir('test2', mode=0707)
 
761
        self.assertTrue(os.path.exists('test'))
 
762
        self.assertTrue(os.path.exists('test2'))
 
763
 
 
764
 
 
765
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
766
 
 
767
    def test_local_fdatasync_calls_fdatasync(self):
 
768
        """Check fdatasync on a stream tries to flush the data to the OS.
 
769
        
 
770
        We can't easily observe the external effect but we can at least see
 
771
        it's called.
 
772
        """
 
773
        sentinel = object()
 
774
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
775
        if fdatasync is sentinel:
 
776
            raise tests.TestNotApplicable('fdatasync not supported')
 
777
        t = self.get_transport('.')
 
778
        calls = self.recordCalls(os, 'fdatasync')
 
779
        w = t.open_write_stream('out')
 
780
        w.write('foo')
 
781
        w.fdatasync()
 
782
        with open('out', 'rb') as f:
 
783
            # Should have been flushed.
 
784
            self.assertEquals(f.read(), 'foo')
 
785
        self.assertEquals(len(calls), 1, calls)
 
786
 
 
787
    def test_missing_directory(self):
 
788
        t = self.get_transport('.')
 
789
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
790
 
 
791
 
 
792
class TestWin32LocalTransport(tests.TestCase):
 
793
 
 
794
    def test_unc_clone_to_root(self):
 
795
        # Win32 UNC path like \\HOST\path
 
796
        # clone to root should stop at least at \\HOST part
 
797
        # not on \\
 
798
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
799
        for i in xrange(4):
 
800
            t = t.clone('..')
 
801
        self.assertEquals(t.base, 'file://HOST/')
 
802
        # make sure we reach the root
 
803
        t = t.clone('..')
 
804
        self.assertEquals(t.base, 'file://HOST/')
 
805
 
 
806
 
 
807
class TestConnectedTransport(tests.TestCase):
 
808
    """Tests for connected to remote server transports"""
 
809
 
 
810
    def test_parse_url(self):
 
811
        t = transport.ConnectedTransport(
 
812
            'http://simple.example.com/home/source')
 
813
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
814
        self.assertEquals(t._parsed_url.port, None)
 
815
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
816
        self.assertTrue(t._parsed_url.user is None)
 
817
        self.assertTrue(t._parsed_url.password is None)
 
818
 
 
819
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
820
 
 
821
    def test_parse_url_with_at_in_user(self):
 
822
        # Bug 228058
 
823
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
824
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
825
 
 
826
    def test_parse_quoted_url(self):
 
827
        t = transport.ConnectedTransport(
 
828
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
829
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
830
        self.assertEquals(t._parsed_url.port, 2222)
 
831
        self.assertEquals(t._parsed_url.user, 'robey')
 
832
        self.assertEquals(t._parsed_url.password, 'h@t')
 
833
        self.assertEquals(t._parsed_url.path, '/path/')
 
834
 
 
835
        # Base should not keep track of the password
 
836
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
837
 
 
838
    def test_parse_invalid_url(self):
 
839
        self.assertRaises(errors.InvalidURL,
 
840
                          transport.ConnectedTransport,
 
841
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
842
 
 
843
    def test_relpath(self):
 
844
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
845
 
 
846
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
847
            'sub')
 
848
        self.assertRaises(errors.PathNotChild, t.relpath,
 
849
                          'http://user@host.com/abs/path/sub')
 
850
        self.assertRaises(errors.PathNotChild, t.relpath,
 
851
                          'sftp://user2@host.com/abs/path/sub')
 
852
        self.assertRaises(errors.PathNotChild, t.relpath,
 
853
                          'sftp://user@otherhost.com/abs/path/sub')
 
854
        self.assertRaises(errors.PathNotChild, t.relpath,
 
855
                          'sftp://user@host.com:33/abs/path/sub')
 
856
        # Make sure it works when we don't supply a username
 
857
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
858
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
859
 
 
860
        # Make sure it works when parts of the path will be url encoded
 
861
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
862
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
863
 
 
864
    def test_connection_sharing_propagate_credentials(self):
 
865
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
866
        self.assertEquals('user', t._parsed_url.user)
 
867
        self.assertEquals('host.com', t._parsed_url.host)
 
868
        self.assertIs(None, t._get_connection())
 
869
        self.assertIs(None, t._parsed_url.password)
 
870
        c = t.clone('subdir')
 
871
        self.assertIs(None, c._get_connection())
 
872
        self.assertIs(None, t._parsed_url.password)
 
873
 
 
874
        # Simulate the user entering a password
 
875
        password = 'secret'
 
876
        connection = object()
 
877
        t._set_connection(connection, password)
 
878
        self.assertIs(connection, t._get_connection())
 
879
        self.assertIs(password, t._get_credentials())
 
880
        self.assertIs(connection, c._get_connection())
 
881
        self.assertIs(password, c._get_credentials())
 
882
 
 
883
        # credentials can be updated
 
884
        new_password = 'even more secret'
 
885
        c._update_credentials(new_password)
 
886
        self.assertIs(connection, t._get_connection())
 
887
        self.assertIs(new_password, t._get_credentials())
 
888
        self.assertIs(connection, c._get_connection())
 
889
        self.assertIs(new_password, c._get_credentials())
 
890
 
 
891
 
 
892
class TestReusedTransports(tests.TestCase):
 
893
    """Tests for transport reuse"""
 
894
 
 
895
    def test_reuse_same_transport(self):
 
896
        possible_transports = []
 
897
        t1 = transport.get_transport_from_url('http://foo/',
 
898
                                     possible_transports=possible_transports)
 
899
        self.assertEqual([t1], possible_transports)
 
900
        t2 = transport.get_transport_from_url('http://foo/',
 
901
                                     possible_transports=[t1])
 
902
        self.assertIs(t1, t2)
 
903
 
 
904
        # Also check that final '/' are handled correctly
 
905
        t3 = transport.get_transport_from_url('http://foo/path/')
 
906
        t4 = transport.get_transport_from_url('http://foo/path',
 
907
                                     possible_transports=[t3])
 
908
        self.assertIs(t3, t4)
 
909
 
 
910
        t5 = transport.get_transport_from_url('http://foo/path')
 
911
        t6 = transport.get_transport_from_url('http://foo/path/',
 
912
                                     possible_transports=[t5])
 
913
        self.assertIs(t5, t6)
 
914
 
 
915
    def test_don_t_reuse_different_transport(self):
 
916
        t1 = transport.get_transport_from_url('http://foo/path')
 
917
        t2 = transport.get_transport_from_url('http://bar/path',
 
918
                                     possible_transports=[t1])
 
919
        self.assertIsNot(t1, t2)
 
920
 
 
921
 
 
922
class TestTransportTrace(tests.TestCase):
 
923
 
 
924
    def test_decorator(self):
 
925
        t = transport.get_transport_from_url('trace+memory://')
 
926
        self.assertIsInstance(
 
927
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
928
 
 
929
    def test_clone_preserves_activity(self):
 
930
        t = transport.get_transport_from_url('trace+memory://')
 
931
        t2 = t.clone('.')
 
932
        self.assertTrue(t is not t2)
 
933
        self.assertTrue(t._activity is t2._activity)
 
934
 
 
935
    # the following specific tests are for the operations that have made use of
 
936
    # logging in tests; we could test every single operation but doing that
 
937
    # still won't cause a test failure when the top level Transport API
 
938
    # changes; so there is little return doing that.
 
939
    def test_get(self):
 
940
        t = transport.get_transport_from_url('trace+memory:///')
 
941
        t.put_bytes('foo', 'barish')
 
942
        t.get('foo')
 
943
        expected_result = []
 
944
        # put_bytes records the bytes, not the content to avoid memory
 
945
        # pressure.
 
946
        expected_result.append(('put_bytes', 'foo', 6, None))
 
947
        # get records the file name only.
 
948
        expected_result.append(('get', 'foo'))
 
949
        self.assertEqual(expected_result, t._activity)
 
950
 
 
951
    def test_readv(self):
 
952
        t = transport.get_transport_from_url('trace+memory:///')
 
953
        t.put_bytes('foo', 'barish')
 
954
        list(t.readv('foo', [(0, 1), (3, 2)],
 
955
                     adjust_for_latency=True, upper_limit=6))
 
956
        expected_result = []
 
957
        # put_bytes records the bytes, not the content to avoid memory
 
958
        # pressure.
 
959
        expected_result.append(('put_bytes', 'foo', 6, None))
 
960
        # readv records the supplied offset request
 
961
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
962
        self.assertEqual(expected_result, t._activity)
 
963
 
 
964
 
 
965
class TestSSHConnections(tests.TestCaseWithTransport):
 
966
 
 
967
    def test_bzr_connect_to_bzr_ssh(self):
 
968
        """get_transport of a bzr+ssh:// behaves correctly.
 
969
 
 
970
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
971
        """
 
972
        # This test actually causes a bzr instance to be invoked, which is very
 
973
        # expensive: it should be the only such test in the test suite.
 
974
        # A reasonable evolution for this would be to simply check inside
 
975
        # check_channel_exec_request that the command is appropriate, and then
 
976
        # satisfy requests in-process.
 
977
        self.requireFeature(features.paramiko)
 
978
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
979
        # override the interface (doesn't change self._vendor).
 
980
        # Note that this does encryption, so can be slow.
 
981
        from bzrlib.tests import stub_sftp
 
982
 
 
983
        # Start an SSH server
 
984
        self.command_executed = []
 
985
        # XXX: This is horrible -- we define a really dumb SSH server that
 
986
        # executes commands, and manage the hooking up of stdin/out/err to the
 
987
        # SSH channel ourselves.  Surely this has already been implemented
 
988
        # elsewhere?
 
989
        started = []
 
990
 
 
991
        class StubSSHServer(stub_sftp.StubServer):
 
992
 
 
993
            test = self
 
994
 
 
995
            def check_channel_exec_request(self, channel, command):
 
996
                self.test.command_executed.append(command)
 
997
                proc = subprocess.Popen(
 
998
                    command, shell=True, stdin=subprocess.PIPE,
 
999
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
1000
 
 
1001
                # XXX: horribly inefficient, not to mention ugly.
 
1002
                # Start a thread for each of stdin/out/err, and relay bytes
 
1003
                # from the subprocess to channel and vice versa.
 
1004
                def ferry_bytes(read, write, close):
 
1005
                    while True:
 
1006
                        bytes = read(1)
 
1007
                        if bytes == '':
 
1008
                            close()
 
1009
                            break
 
1010
                        write(bytes)
 
1011
 
 
1012
                file_functions = [
 
1013
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
1014
                    (proc.stdout.read, channel.sendall, channel.close),
 
1015
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
1016
                started.append(proc)
 
1017
                for read, write, close in file_functions:
 
1018
                    t = threading.Thread(
 
1019
                        target=ferry_bytes, args=(read, write, close))
 
1020
                    t.start()
 
1021
                    started.append(t)
 
1022
 
 
1023
                return True
 
1024
 
 
1025
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
1026
        # We *don't* want to override the default SSH vendor: the detected one
 
1027
        # is the one to use.
 
1028
 
 
1029
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1030
        # inherits from SFTPServer which forces the SSH vendor to
 
1031
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
1032
        self.start_server(ssh_server)
 
1033
        port = ssh_server.port
 
1034
 
 
1035
        if sys.platform == 'win32':
 
1036
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1037
        else:
 
1038
            bzr_remote_path = self.get_bzr_path()
 
1039
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
1040
 
 
1041
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
1042
        # variable is used to tell bzr what command to run on the remote end.
 
1043
        path_to_branch = osutils.abspath('.')
 
1044
        if sys.platform == 'win32':
 
1045
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
1046
            # prefix a '/' to get the right path.
 
1047
            path_to_branch = '/' + path_to_branch
 
1048
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
1049
        t = transport.get_transport(url)
 
1050
        self.permit_url(t.base)
 
1051
        t.mkdir('foo')
 
1052
 
 
1053
        self.assertEqual(
 
1054
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1055
            self.command_executed)
 
1056
        # Make sure to disconnect, so that the remote process can stop, and we
 
1057
        # can cleanup. Then pause the test until everything is shutdown
 
1058
        t._client._medium.disconnect()
 
1059
        if not started:
 
1060
            return
 
1061
        # First wait for the subprocess
 
1062
        started[0].wait()
 
1063
        # And the rest are threads
 
1064
        for t in started[1:]:
 
1065
            t.join()
 
1066
 
 
1067
 
 
1068
class TestUnhtml(tests.TestCase):
 
1069
 
 
1070
    """Tests for unhtml_roughly"""
 
1071
 
 
1072
    def test_truncation(self):
 
1073
        fake_html = "<p>something!\n" * 1000
 
1074
        result = http.unhtml_roughly(fake_html)
 
1075
        self.assertEquals(len(result), 1000)
 
1076
        self.assertStartsWith(result, " something!")
 
1077
 
 
1078
 
 
1079
class SomeDirectory(object):
 
1080
 
 
1081
    def look_up(self, name, url):
 
1082
        return "http://bar"
 
1083
 
 
1084
 
 
1085
class TestLocationToUrl(tests.TestCase):
 
1086
 
 
1087
    def get_base_location(self):
 
1088
        path = osutils.abspath('/foo/bar')
 
1089
        if path.startswith('/'):
 
1090
            url = 'file://%s' % (path,)
 
1091
        else:
 
1092
            # On Windows, abspaths start with the drive letter, so we have to
 
1093
            # add in the extra '/'
 
1094
            url = 'file:///%s' % (path,)
 
1095
        return path, url
 
1096
 
 
1097
    def test_regular_url(self):
 
1098
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1099
 
 
1100
    def test_directory(self):
 
1101
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1102
        self.addCleanup(directories.remove, "bar:")
 
1103
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1104
 
 
1105
    def test_unicode_url(self):
 
1106
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1107
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1108
 
 
1109
    def test_unicode_path(self):
 
1110
        path, url = self.get_base_location()
 
1111
        location = path + "\xc3\xaf".decode("utf-8")
 
1112
        url += '%C3%AF'
 
1113
        self.assertEquals(url, location_to_url(location))
 
1114
 
 
1115
    def test_path(self):
 
1116
        path, url = self.get_base_location()
 
1117
        self.assertEquals(url, location_to_url(path))
 
1118
 
 
1119
    def test_relative_file_url(self):
 
1120
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1121
            location_to_url("file:bar"))
 
1122
 
 
1123
    def test_absolute_file_url(self):
 
1124
        self.assertEquals("file:///bar", location_to_url("file:/bar"))