/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/stub_sftp.py

  • Committer: Jelmer Vernooij
  • Date: 2020-03-22 01:35:14 UTC
  • mfrom: (7490.7.6 work)
  • mto: This revision was merged to the branch mainline in revision 7499.
  • Revision ID: jelmer@jelmer.uk-20200322013514-7vw1ntwho04rcuj3
merge lp:brz/3.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2008-2011 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""
 
18
A stub SFTP server for loopback SFTP testing.
 
19
Adapted from the one in paramiko's unit tests.
 
20
"""
 
21
 
 
22
import os
 
23
import paramiko
 
24
import socket
 
25
import socketserver
 
26
import sys
 
27
import time
 
28
 
 
29
from .. import (
 
30
    osutils,
 
31
    trace,
 
32
    urlutils,
 
33
    )
 
34
from ..transport import (
 
35
    ssh,
 
36
    )
 
37
from . import test_server
 
38
 
 
39
 
 
40
class StubServer(paramiko.ServerInterface):
 
41
 
 
42
    def __init__(self, test_case_server):
 
43
        paramiko.ServerInterface.__init__(self)
 
44
        self.log = test_case_server.log
 
45
 
 
46
    def check_auth_password(self, username, password):
 
47
        # all are allowed
 
48
        self.log('sftpserver - authorizing: %s' % (username,))
 
49
        return paramiko.AUTH_SUCCESSFUL
 
50
 
 
51
    def check_channel_request(self, kind, chanid):
 
52
        self.log('sftpserver - channel request: %s, %s' % (kind, chanid))
 
53
        return paramiko.OPEN_SUCCEEDED
 
54
 
 
55
 
 
56
class StubSFTPHandle(paramiko.SFTPHandle):
 
57
 
 
58
    def stat(self):
 
59
        try:
 
60
            return paramiko.SFTPAttributes.from_stat(
 
61
                os.fstat(self.readfile.fileno()))
 
62
        except OSError as e:
 
63
            return paramiko.SFTPServer.convert_errno(e.errno)
 
64
 
 
65
    def chattr(self, attr):
 
66
        # python doesn't have equivalents to fchown or fchmod, so we have to
 
67
        # use the stored filename
 
68
        trace.mutter('Changing permissions on %s to %s', self.filename, attr)
 
69
        try:
 
70
            paramiko.SFTPServer.set_file_attr(self.filename, attr)
 
71
        except OSError as e:
 
72
            return paramiko.SFTPServer.convert_errno(e.errno)
 
73
 
 
74
 
 
75
class StubSFTPServer(paramiko.SFTPServerInterface):
 
76
 
 
77
    def __init__(self, server, root, home=None):
 
78
        paramiko.SFTPServerInterface.__init__(self, server)
 
79
        # All paths are actually relative to 'root'.
 
80
        # this is like implementing chroot().
 
81
        self.root = root
 
82
        if home is None:
 
83
            self.home = ''
 
84
        else:
 
85
            if not home.startswith(self.root):
 
86
                raise AssertionError(
 
87
                    "home must be a subdirectory of root (%s vs %s)"
 
88
                    % (home, root))
 
89
            self.home = home[len(self.root):]
 
90
        if self.home.startswith('/'):
 
91
            self.home = self.home[1:]
 
92
        server.log('sftpserver - new connection')
 
93
 
 
94
    def _realpath(self, path):
 
95
        # paths returned from self.canonicalize() always start with
 
96
        # a path separator. So if 'root' is just '/', this would cause
 
97
        # a double slash at the beginning '//home/dir'.
 
98
        if self.root == '/':
 
99
            return self.canonicalize(path)
 
100
        return self.root + self.canonicalize(path)
 
101
 
 
102
    if sys.platform == 'win32':
 
103
        def canonicalize(self, path):
 
104
            # Win32 sftp paths end up looking like
 
105
            #     sftp://host@foo/h:/foo/bar
 
106
            # which means absolute paths look like:
 
107
            #     /h:/foo/bar
 
108
            # and relative paths stay the same:
 
109
            #     foo/bar
 
110
            # win32 needs to use the Unicode APIs. so we require the
 
111
            # paths to be utf8 (Linux just uses bytestreams)
 
112
            thispath = path.decode('utf8')
 
113
            if path.startswith('/'):
 
114
                # Abspath H:/foo/bar
 
115
                return os.path.normpath(thispath[1:])
 
116
            else:
 
117
                return os.path.normpath(os.path.join(self.home, thispath))
 
118
    else:
 
119
        def canonicalize(self, path):
 
120
            if os.path.isabs(path):
 
121
                return osutils.normpath(path)
 
122
            else:
 
123
                return osutils.normpath('/' + os.path.join(self.home, path))
 
124
 
 
125
    def chattr(self, path, attr):
 
126
        try:
 
127
            paramiko.SFTPServer.set_file_attr(path, attr)
 
128
        except OSError as e:
 
129
            return paramiko.SFTPServer.convert_errno(e.errno)
 
130
        return paramiko.SFTP_OK
 
131
 
 
132
    def list_folder(self, path):
 
133
        path = self._realpath(path)
 
134
        try:
 
135
            out = []
 
136
            # TODO: win32 incorrectly lists paths with non-ascii if path is not
 
137
            # unicode. However on unix the server should only deal with
 
138
            # bytestreams and posix.listdir does the right thing
 
139
            if sys.platform == 'win32':
 
140
                flist = [f.encode('utf8') for f in os.listdir(path)]
 
141
            else:
 
142
                flist = os.listdir(path)
 
143
            for fname in flist:
 
144
                attr = paramiko.SFTPAttributes.from_stat(
 
145
                    os.stat(osutils.pathjoin(path, fname)))
 
146
                attr.filename = fname
 
147
                out.append(attr)
 
148
            return out
 
149
        except OSError as e:
 
150
            return paramiko.SFTPServer.convert_errno(e.errno)
 
151
 
 
152
    def stat(self, path):
 
153
        path = self._realpath(path)
 
154
        try:
 
155
            return paramiko.SFTPAttributes.from_stat(os.stat(path))
 
156
        except OSError as e:
 
157
            return paramiko.SFTPServer.convert_errno(e.errno)
 
158
 
 
159
    def lstat(self, path):
 
160
        path = self._realpath(path)
 
161
        try:
 
162
            return paramiko.SFTPAttributes.from_stat(os.lstat(path))
 
163
        except OSError as e:
 
164
            return paramiko.SFTPServer.convert_errno(e.errno)
 
165
 
 
166
    def open(self, path, flags, attr):
 
167
        path = self._realpath(path)
 
168
        try:
 
169
            flags |= getattr(os, 'O_BINARY', 0)
 
170
            if getattr(attr, 'st_mode', None):
 
171
                fd = os.open(path, flags, attr.st_mode)
 
172
            else:
 
173
                # os.open() defaults to 0777 which is
 
174
                # an odd default mode for files
 
175
                fd = os.open(path, flags, 0o666)
 
176
        except OSError as e:
 
177
            return paramiko.SFTPServer.convert_errno(e.errno)
 
178
 
 
179
        if (flags & os.O_CREAT) and (attr is not None):
 
180
            attr._flags &= ~attr.FLAG_PERMISSIONS
 
181
            paramiko.SFTPServer.set_file_attr(path, attr)
 
182
        if flags & os.O_WRONLY:
 
183
            fstr = 'wb'
 
184
        elif flags & os.O_RDWR:
 
185
            fstr = 'rb+'
 
186
        else:
 
187
            # O_RDONLY (== 0)
 
188
            fstr = 'rb'
 
189
        try:
 
190
            f = os.fdopen(fd, fstr)
 
191
        except (IOError, OSError) as e:
 
192
            return paramiko.SFTPServer.convert_errno(e.errno)
 
193
        fobj = StubSFTPHandle()
 
194
        fobj.filename = path
 
195
        fobj.readfile = f
 
196
        fobj.writefile = f
 
197
        return fobj
 
198
 
 
199
    def remove(self, path):
 
200
        path = self._realpath(path)
 
201
        try:
 
202
            os.remove(path)
 
203
        except OSError as e:
 
204
            return paramiko.SFTPServer.convert_errno(e.errno)
 
205
        return paramiko.SFTP_OK
 
206
 
 
207
    def rename(self, oldpath, newpath):
 
208
        oldpath = self._realpath(oldpath)
 
209
        newpath = self._realpath(newpath)
 
210
        try:
 
211
            os.rename(oldpath, newpath)
 
212
        except OSError as e:
 
213
            return paramiko.SFTPServer.convert_errno(e.errno)
 
214
        return paramiko.SFTP_OK
 
215
 
 
216
    def symlink(self, target_path, path):
 
217
        path = self._realpath(path)
 
218
        try:
 
219
            os.symlink(target_path, path)
 
220
        except OSError as e:
 
221
            return paramiko.SFTPServer.convert_errno(e.errno)
 
222
        return paramiko.SFTP_OK
 
223
 
 
224
    def readlink(self, path):
 
225
        path = self._realpath(path)
 
226
        try:
 
227
            target_path = os.readlink(path)
 
228
        except OSError as e:
 
229
            return paramiko.SFTPServer.convert_errno(e.errno)
 
230
        return target_path
 
231
 
 
232
    def mkdir(self, path, attr):
 
233
        path = self._realpath(path)
 
234
        try:
 
235
            # Using getattr() in case st_mode is None or 0
 
236
            # both evaluate to False
 
237
            if getattr(attr, 'st_mode', None):
 
238
                os.mkdir(path, attr.st_mode)
 
239
            else:
 
240
                os.mkdir(path)
 
241
            if attr is not None:
 
242
                attr._flags &= ~attr.FLAG_PERMISSIONS
 
243
                paramiko.SFTPServer.set_file_attr(path, attr)
 
244
        except OSError as e:
 
245
            return paramiko.SFTPServer.convert_errno(e.errno)
 
246
        return paramiko.SFTP_OK
 
247
 
 
248
    def rmdir(self, path):
 
249
        path = self._realpath(path)
 
250
        try:
 
251
            os.rmdir(path)
 
252
        except OSError as e:
 
253
            return paramiko.SFTPServer.convert_errno(e.errno)
 
254
        return paramiko.SFTP_OK
 
255
 
 
256
    # removed: chattr
 
257
    # (nothing in bzr's sftp transport uses those)
 
258
 
 
259
 
 
260
# ------------- server test implementation --------------
 
261
 
 
262
STUB_SERVER_KEY = """\
 
263
-----BEGIN RSA PRIVATE KEY-----
 
264
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
265
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
266
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
267
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
268
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
269
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
270
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
271
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
272
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
273
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
274
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
275
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
276
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
277
-----END RSA PRIVATE KEY-----
 
278
"""
 
279
 
 
280
 
 
281
class SocketDelay(object):
 
282
    """A socket decorator to make TCP appear slower.
 
283
 
 
284
    This changes recv, send, and sendall to add a fixed latency to each python
 
285
    call if a new roundtrip is detected. That is, when a recv is called and the
 
286
    flag new_roundtrip is set, latency is charged. Every send and send_all
 
287
    sets this flag.
 
288
 
 
289
    In addition every send, sendall and recv sleeps a bit per character send to
 
290
    simulate bandwidth.
 
291
 
 
292
    Not all methods are implemented, this is deliberate as this class is not a
 
293
    replacement for the builtin sockets layer. fileno is not implemented to
 
294
    prevent the proxy being bypassed.
 
295
    """
 
296
 
 
297
    simulated_time = 0
 
298
    _proxied_arguments = dict.fromkeys([
 
299
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
 
300
        "setblocking", "setsockopt", "settimeout", "shutdown"])
 
301
 
 
302
    def __init__(self, sock, latency, bandwidth=1.0,
 
303
                 really_sleep=True):
 
304
        """
 
305
        :param bandwith: simulated bandwith (MegaBit)
 
306
        :param really_sleep: If set to false, the SocketDelay will just
 
307
        increase a counter, instead of calling time.sleep. This is useful for
 
308
        unittesting the SocketDelay.
 
309
        """
 
310
        self.sock = sock
 
311
        self.latency = latency
 
312
        self.really_sleep = really_sleep
 
313
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
 
314
        self.new_roundtrip = False
 
315
 
 
316
    def sleep(self, s):
 
317
        if self.really_sleep:
 
318
            time.sleep(s)
 
319
        else:
 
320
            SocketDelay.simulated_time += s
 
321
 
 
322
    def __getattr__(self, attr):
 
323
        if attr in SocketDelay._proxied_arguments:
 
324
            return getattr(self.sock, attr)
 
325
        raise AttributeError("'SocketDelay' object has no attribute %r" %
 
326
                             attr)
 
327
 
 
328
    def dup(self):
 
329
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
 
330
                           self._sleep)
 
331
 
 
332
    def recv(self, *args):
 
333
        data = self.sock.recv(*args)
 
334
        if data and self.new_roundtrip:
 
335
            self.new_roundtrip = False
 
336
            self.sleep(self.latency)
 
337
        self.sleep(len(data) * self.time_per_byte)
 
338
        return data
 
339
 
 
340
    def sendall(self, data, flags=0):
 
341
        if not self.new_roundtrip:
 
342
            self.new_roundtrip = True
 
343
            self.sleep(self.latency)
 
344
        self.sleep(len(data) * self.time_per_byte)
 
345
        return self.sock.sendall(data, flags)
 
346
 
 
347
    def send(self, data, flags=0):
 
348
        if not self.new_roundtrip:
 
349
            self.new_roundtrip = True
 
350
            self.sleep(self.latency)
 
351
        bytes_sent = self.sock.send(data, flags)
 
352
        self.sleep(bytes_sent * self.time_per_byte)
 
353
        return bytes_sent
 
354
 
 
355
 
 
356
class TestingSFTPConnectionHandler(socketserver.BaseRequestHandler):
 
357
 
 
358
    def setup(self):
 
359
        self.wrap_for_latency()
 
360
        tcs = self.server.test_case_server
 
361
        ptrans = paramiko.Transport(self.request)
 
362
        self.paramiko_transport = ptrans
 
363
        # Set it to a channel under 'bzr' so that we get debug info
 
364
        ptrans.set_log_channel('brz.paramiko.transport')
 
365
        ptrans.add_server_key(tcs.get_host_key())
 
366
        ptrans.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
367
                                     StubSFTPServer, root=tcs._root,
 
368
                                     home=tcs._server_homedir)
 
369
        server = tcs._server_interface(tcs)
 
370
        # This blocks until the key exchange has been done
 
371
        ptrans.start_server(None, server)
 
372
 
 
373
    def finish(self):
 
374
        # Wait for the conversation to finish, when the paramiko.Transport
 
375
        # thread finishes
 
376
        # TODO: Consider timing out after XX seconds rather than hanging.
 
377
        #       Also we could check paramiko_transport.active and possibly
 
378
        #       paramiko_transport.getException().
 
379
        self.paramiko_transport.join()
 
380
 
 
381
    def wrap_for_latency(self):
 
382
        tcs = self.server.test_case_server
 
383
        if tcs.add_latency:
 
384
            # Give the socket (which the request really is) a latency adding
 
385
            # decorator.
 
386
            self.request = SocketDelay(self.request, tcs.add_latency)
 
387
 
 
388
 
 
389
class TestingSFTPWithoutSSHConnectionHandler(TestingSFTPConnectionHandler):
 
390
 
 
391
    def setup(self):
 
392
        self.wrap_for_latency()
 
393
        # Re-import these as locals, so that they're still accessible during
 
394
        # interpreter shutdown (when all module globals get set to None, leading
 
395
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
396
 
 
397
        class FakeChannel(object):
 
398
            def get_transport(self):
 
399
                return self
 
400
 
 
401
            def get_log_channel(self):
 
402
                return 'brz.paramiko'
 
403
 
 
404
            def get_name(self):
 
405
                return '1'
 
406
 
 
407
            def get_hexdump(self):
 
408
                return False
 
409
 
 
410
            def close(self):
 
411
                pass
 
412
 
 
413
        tcs = self.server.test_case_server
 
414
        sftp_server = paramiko.SFTPServer(
 
415
            FakeChannel(), 'sftp', StubServer(tcs), StubSFTPServer,
 
416
            root=tcs._root, home=tcs._server_homedir)
 
417
        self.sftp_server = sftp_server
 
418
        sys_stderr = sys.stderr  # Used in error reporting during shutdown
 
419
        try:
 
420
            sftp_server.start_subsystem(
 
421
                'sftp', None, ssh.SocketAsChannelAdapter(self.request))
 
422
        except socket.error as e:
 
423
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
424
                # it's okay for the client to disconnect abruptly
 
425
                # (bug in paramiko 1.6: it should absorb this exception)
 
426
                pass
 
427
            else:
 
428
                raise
 
429
        except Exception as e:
 
430
            # This typically seems to happen during interpreter shutdown, so
 
431
            # most of the useful ways to report this error won't work.
 
432
            # Writing the exception type, and then the text of the exception,
 
433
            # seems to be the best we can do.
 
434
            # FIXME: All interpreter shutdown errors should have been related
 
435
            # to daemon threads, cleanup needed -- vila 20100623
 
436
            sys_stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
 
437
            sys_stderr.write('%s\n\n' % (e,))
 
438
 
 
439
    def finish(self):
 
440
        self.sftp_server.finish_subsystem()
 
441
 
 
442
 
 
443
class TestingSFTPServer(test_server.TestingThreadingTCPServer):
 
444
 
 
445
    def __init__(self, server_address, request_handler_class, test_case_server):
 
446
        test_server.TestingThreadingTCPServer.__init__(
 
447
            self, server_address, request_handler_class)
 
448
        self.test_case_server = test_case_server
 
449
 
 
450
 
 
451
class SFTPServer(test_server.TestingTCPServerInAThread):
 
452
    """Common code for SFTP server facilities."""
 
453
 
 
454
    def __init__(self, server_interface=StubServer):
 
455
        self.host = '127.0.0.1'
 
456
        self.port = 0
 
457
        super(SFTPServer, self).__init__((self.host, self.port),
 
458
                                         TestingSFTPServer,
 
459
                                         TestingSFTPConnectionHandler)
 
460
        self._original_vendor = None
 
461
        self._vendor = ssh.ParamikoVendor()
 
462
        self._server_interface = server_interface
 
463
        self._host_key = None
 
464
        self.logs = []
 
465
        self.add_latency = 0
 
466
        self._homedir = None
 
467
        self._server_homedir = None
 
468
        self._root = None
 
469
 
 
470
    def _get_sftp_url(self, path):
 
471
        """Calculate an sftp url to this server for path."""
 
472
        return "sftp://foo:bar@%s:%s/%s" % (self.host, self.port, path)
 
473
 
 
474
    def log(self, message):
 
475
        """StubServer uses this to log when a new server is created."""
 
476
        self.logs.append(message)
 
477
 
 
478
    def create_server(self):
 
479
        server = self.server_class((self.host, self.port),
 
480
                                   self.request_handler_class,
 
481
                                   self)
 
482
        return server
 
483
 
 
484
    def get_host_key(self):
 
485
        if self._host_key is None:
 
486
            key_file = osutils.pathjoin(self._homedir, 'test_rsa.key')
 
487
            f = open(key_file, 'w')
 
488
            try:
 
489
                f.write(STUB_SERVER_KEY)
 
490
            finally:
 
491
                f.close()
 
492
            self._host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
493
        return self._host_key
 
494
 
 
495
    def start_server(self, backing_server=None):
 
496
        # XXX: TODO: make sftpserver back onto backing_server rather than local
 
497
        # disk.
 
498
        if not (backing_server is None
 
499
                or isinstance(backing_server, test_server.LocalURLServer)):
 
500
            raise AssertionError(
 
501
                'backing_server should not be %r, because this can only serve '
 
502
                'the local current working directory.' % (backing_server,))
 
503
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
 
504
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
 
505
        self._homedir = osutils.getcwd()
 
506
        if sys.platform == 'win32':
 
507
            # Normalize the path or it will be wrongly escaped
 
508
            self._homedir = osutils.normpath(self._homedir)
 
509
        else:
 
510
            self._homedir = self._homedir
 
511
        if self._server_homedir is None:
 
512
            self._server_homedir = self._homedir
 
513
        self._root = '/'
 
514
        if sys.platform == 'win32':
 
515
            self._root = ''
 
516
        super(SFTPServer, self).start_server()
 
517
 
 
518
    def stop_server(self):
 
519
        try:
 
520
            super(SFTPServer, self).stop_server()
 
521
        finally:
 
522
            ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
 
523
 
 
524
    def get_bogus_url(self):
 
525
        """See breezy.transport.Server.get_bogus_url."""
 
526
        # this is chosen to try to prevent trouble with proxies, weird dns, etc
 
527
        # we bind a random socket, so that we get a guaranteed unused port
 
528
        # we just never listen on that port
 
529
        s = socket.socket()
 
530
        s.bind(('localhost', 0))
 
531
        return 'sftp://%s:%s/' % s.getsockname()
 
532
 
 
533
 
 
534
class SFTPFullAbsoluteServer(SFTPServer):
 
535
    """A test server for sftp transports, using absolute urls and ssh."""
 
536
 
 
537
    def get_url(self):
 
538
        """See breezy.transport.Server.get_url."""
 
539
        homedir = self._homedir
 
540
        if sys.platform != 'win32':
 
541
            # Remove the initial '/' on all platforms but win32
 
542
            homedir = homedir[1:]
 
543
        return self._get_sftp_url(urlutils.escape(homedir))
 
544
 
 
545
 
 
546
class SFTPServerWithoutSSH(SFTPServer):
 
547
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
 
548
 
 
549
    def __init__(self):
 
550
        super(SFTPServerWithoutSSH, self).__init__()
 
551
        self._vendor = ssh.LoopbackVendor()
 
552
        self.request_handler_class = TestingSFTPWithoutSSHConnectionHandler
 
553
 
 
554
    def get_host_key():
 
555
        return None
 
556
 
 
557
 
 
558
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
 
559
    """A test server for sftp transports, using absolute urls."""
 
560
 
 
561
    def get_url(self):
 
562
        """See breezy.transport.Server.get_url."""
 
563
        homedir = self._homedir
 
564
        if sys.platform != 'win32':
 
565
            # Remove the initial '/' on all platforms but win32
 
566
            homedir = homedir[1:]
 
567
        return self._get_sftp_url(urlutils.escape(homedir))
 
568
 
 
569
 
 
570
class SFTPHomeDirServer(SFTPServerWithoutSSH):
 
571
    """A test server for sftp transports, using homedir relative urls."""
 
572
 
 
573
    def get_url(self):
 
574
        """See breezy.transport.Server.get_url."""
 
575
        return self._get_sftp_url("%7E/")
 
576
 
 
577
 
 
578
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
 
579
    """A test server for sftp transports where only absolute paths will work.
 
580
 
 
581
    It does this by serving from a deeply-nested directory that doesn't exist.
 
582
    """
 
583
 
 
584
    def create_server(self):
 
585
        # FIXME: Can't we do that in a cleaner way ? -- vila 20100623
 
586
        server = super(SFTPSiblingAbsoluteServer, self).create_server()
 
587
        server._server_homedir = '/dev/noone/runs/tests/here'
 
588
        return server