/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

separate out the client medium from the client encoding protocol for the smart server.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK+ TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the
 
51
idea that you have multiple requests and get have a read error because the
 
52
other side did shutdown sd send.  For pipes we have read pipe which will have a
 
53
zero read which marks end-of-file.  For HTTP server environment there is not
 
54
end-of-stream because each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialisation, deserialisation)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialisation, deserialisation)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
109
"""
 
110
 
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
 
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
 
197
#       the socket, and it assumes everything is UTF8 sections separated
 
198
#       by \001. Which means a request like '\002' Will abort the connection
 
199
#       because of a UnicodeDecodeError. It does look like invalid data will
 
200
#       kill the SmartServerStreamMedium, but only with an abort + exception, and 
 
201
#       the overall server shouldn't die.
 
202
 
 
203
from cStringIO import StringIO
 
204
import errno
 
205
import os
 
206
import socket
 
207
import sys
 
208
import tempfile
 
209
import threading
 
210
import urllib
 
211
import urlparse
 
212
 
 
213
from bzrlib import (
 
214
    bzrdir,
 
215
    errors,
 
216
    revision,
 
217
    transport,
 
218
    trace,
 
219
    urlutils,
 
220
    )
 
221
from bzrlib.bundle.serializer import write_bundle
 
222
from bzrlib.trace import mutter
 
223
from bzrlib.transport import local
 
224
 
 
225
# must do this otherwise urllib can't parse the urls properly :(
 
226
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
 
227
    transport.register_urlparse_netloc_protocol(scheme)
 
228
del scheme
 
229
 
 
230
 
 
231
def _recv_tuple(from_file):
 
232
    req_line = from_file.readline()
 
233
    return _decode_tuple(req_line)
 
234
 
 
235
 
 
236
def _decode_tuple(req_line):
 
237
    if req_line == None or req_line == '':
 
238
        return None
 
239
    if req_line[-1] != '\n':
 
240
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
241
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
242
 
 
243
 
 
244
def _encode_tuple(args):
 
245
    """Encode the tuple args to a bytestream."""
 
246
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
 
247
 
 
248
 
 
249
class SmartProtocolBase(object):
 
250
    """Methods common to client and server"""
 
251
 
 
252
    # TODO: this only actually accomodates a single block; possibly should support
 
253
    # multiple chunks?
 
254
    def _recv_bulk(self):
 
255
        # This is OBSOLETE except for the double handline in the server: 
 
256
        # the read_bulk + reencode noise.
 
257
        chunk_len = self._in.readline()
 
258
        try:
 
259
            chunk_len = int(chunk_len)
 
260
        except ValueError:
 
261
            raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
 
262
        bulk = self._in.read(chunk_len)
 
263
        if len(bulk) != chunk_len:
 
264
            raise errors.SmartProtocolError("short read fetching bulk data chunk")
 
265
        self._recv_trailer()
 
266
        return bulk
 
267
 
 
268
    def _recv_trailer(self):
 
269
        resp = self._recv_tuple()
 
270
        if resp == ('done', ):
 
271
            return
 
272
        else:
 
273
            self._translate_error(resp)
 
274
 
 
275
    def _encode_bulk_data(self, body):
 
276
        """Encode body as a bulk data chunk."""
 
277
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
278
 
 
279
    def _serialise_offsets(self, offsets):
 
280
        """Serialise a readv offset list."""
 
281
        txt = []
 
282
        for start, length in offsets:
 
283
            txt.append('%d,%d' % (start, length))
 
284
        return '\n'.join(txt)
 
285
 
 
286
    def _send_bulk_data(self, body, a_file=None):
 
287
        """Send chunked body data"""
 
288
        assert isinstance(body, str)
 
289
        bytes = self._encode_bulk_data(body)
 
290
        self._write_and_flush(bytes, a_file)
 
291
 
 
292
    def _write_and_flush(self, bytes, a_file=None):
 
293
        """Write bytes to self._out and flush it."""
 
294
        # XXX: this will be inefficient.  Just ask Robert.
 
295
        if a_file is None:
 
296
            a_file = self._out
 
297
        a_file.write(bytes)
 
298
        a_file.flush()
 
299
        
 
300
 
 
301
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
302
    """Server-side encoding and decoding logic for smart version 1."""
 
303
    
 
304
    def __init__(self, output_stream, backing_transport):
 
305
        self._out_stream = output_stream
 
306
        self._backing_transport = backing_transport
 
307
        self.finished_reading = False
 
308
        self.in_buffer = ''
 
309
        self.has_dispatched = False
 
310
        self.request = None
 
311
        self._body_decoder = None
 
312
 
 
313
    def accept_bytes(self, bytes):
 
314
        """Take bytes, and advance the internal state machine appropriately.
 
315
        
 
316
        :param bytes: must be a byte string
 
317
        """
 
318
        assert isinstance(bytes, str)
 
319
        self.in_buffer += bytes
 
320
        if not self.has_dispatched:
 
321
            if '\n' not in self.in_buffer:
 
322
                # no command line yet
 
323
                return
 
324
            self.has_dispatched = True
 
325
            # XXX if in_buffer not \n-terminated this will do the wrong
 
326
            # thing.
 
327
            try:
 
328
                assert self.in_buffer.endswith('\n')
 
329
                req_args = _decode_tuple(self.in_buffer)
 
330
                self.in_buffer = ''
 
331
                self.request = SmartServerRequestHandler(
 
332
                    self._backing_transport)
 
333
                self.request.dispatch_command(req_args[0], req_args[1:])
 
334
                if self.request.finished_reading:
 
335
                    # trivial request
 
336
                    self._send_response(self.request.response.args,
 
337
                        self.request.response.body)
 
338
                self.sync_with_request(self.request)
 
339
                return self.request
 
340
            except KeyboardInterrupt:
 
341
                raise
 
342
            except Exception, exception:
 
343
                # everything else: pass to client, flush, and quit
 
344
                self._send_response(('error', str(exception)))
 
345
                return None
 
346
 
 
347
        else:
 
348
            if self.finished_reading:
 
349
                # nothing to do.XXX: this routine should be a single state 
 
350
                # machine too.
 
351
                return
 
352
            if self._body_decoder is None:
 
353
                self._body_decoder = LengthPrefixedBodyDecoder()
 
354
            self._body_decoder.accept_bytes(self.in_buffer)
 
355
            self.in_buffer = self._body_decoder.unused_data
 
356
            body_data = self._body_decoder.read_pending_data()
 
357
            self.request.accept_body(body_data)
 
358
            if self._body_decoder.finished_reading:
 
359
                self.request.end_of_body()
 
360
                assert self.request.finished_reading, \
 
361
                    "no more body, request not finished"
 
362
            self.sync_with_request(self.request)
 
363
            if self.request.response is not None:
 
364
                self._send_response(self.request.response.args,
 
365
                    self.request.response.body)
 
366
            else:
 
367
                assert not self.request.finished_reading, \
 
368
                    "no response and we have finished reading."
 
369
 
 
370
    def _send_response(self, args, body=None):
 
371
        """Send a smart server response down the output stream."""
 
372
        self._out_stream.write(_encode_tuple(args))
 
373
        if body is None:
 
374
            self._out_stream.flush()
 
375
        else:
 
376
            self._send_bulk_data(body, self._out_stream)
 
377
            #self._out_stream.write('BLARGH')
 
378
 
 
379
    def sync_with_request(self, request):
 
380
        self.finished_reading = request.finished_reading
 
381
        
 
382
 
 
383
class LengthPrefixedBodyDecoder(object):
 
384
    """Decodes the length-prefixed bulk data."""
 
385
    
 
386
    def __init__(self):
 
387
        self.finished_reading = False
 
388
        self.unused_data = ''
 
389
        self.state_accept = self._state_accept_expecting_length
 
390
        self.state_read = self._state_read_no_data
 
391
        self._in_buffer = ''
 
392
        self._trailer_buffer = ''
 
393
    
 
394
    def accept_bytes(self, bytes):
 
395
        """Decode as much of bytes as possible.
 
396
 
 
397
        If 'bytes' contains too much data it will be appended to
 
398
        self.unused_data.
 
399
 
 
400
        finished_reading will be set when no more data is required.  Further
 
401
        data will be appended to self.unused_data.
 
402
        """
 
403
        # accept_bytes is allowed to change the state
 
404
        current_state = self.state_accept
 
405
        self.state_accept(bytes)
 
406
        while current_state != self.state_accept:
 
407
            current_state = self.state_accept
 
408
            self.state_accept('')
 
409
 
 
410
    def read_pending_data(self):
 
411
        """Return any pending data that has been decoded."""
 
412
        return self.state_read()
 
413
 
 
414
    def _state_accept_expecting_length(self, bytes):
 
415
        self._in_buffer += bytes
 
416
        pos = self._in_buffer.find('\n')
 
417
        if pos == -1:
 
418
            return
 
419
        self.bytes_left = int(self._in_buffer[:pos])
 
420
        self._in_buffer = self._in_buffer[pos+1:]
 
421
        self.bytes_left -= len(self._in_buffer)
 
422
        self.state_accept = self._state_accept_reading_body
 
423
        self.state_read = self._state_read_in_buffer
 
424
 
 
425
    def _state_accept_reading_body(self, bytes):
 
426
        self._in_buffer += bytes
 
427
        self.bytes_left -= len(bytes)
 
428
        if self.bytes_left <= 0:
 
429
            # Finished with body
 
430
            if self.bytes_left != 0:
 
431
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
432
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
433
            self.state_accept = self._state_accept_reading_trailer
 
434
        
 
435
    def _state_accept_reading_trailer(self, bytes):
 
436
        self._trailer_buffer += bytes
 
437
        if self._trailer_buffer.startswith('done\n'):
 
438
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
439
            self.state_accept = self._state_accept_reading_unused
 
440
            self.finished_reading = True
 
441
    
 
442
    def _state_accept_reading_unused(self, bytes):
 
443
        self.unused_data += bytes
 
444
 
 
445
    def _state_read_no_data(self):
 
446
        return ''
 
447
 
 
448
    def _state_read_in_buffer(self):
 
449
        result = self._in_buffer
 
450
        self._in_buffer = ''
 
451
        return result
 
452
 
 
453
 
 
454
class SmartServerStreamMedium(SmartProtocolBase):
 
455
    """Handles smart commands coming over a stream.
 
456
 
 
457
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
458
    in-process fifo for testing.
 
459
 
 
460
    One instance is created for each connected client; it can serve multiple
 
461
    requests in the lifetime of the connection.
 
462
 
 
463
    The server passes requests through to an underlying backing transport, 
 
464
    which will typically be a LocalTransport looking at the server's filesystem.
 
465
    """
 
466
 
 
467
    def __init__(self, in_file, out_file, backing_transport):
 
468
        """Construct new server.
 
469
 
 
470
        :param in_file: Python file from which requests can be read.
 
471
        :param out_file: Python file to write responses.
 
472
        :param backing_transport: Transport for the directory served.
 
473
        """
 
474
        self._in = in_file
 
475
        self._out = out_file
 
476
        self.backing_transport = backing_transport
 
477
 
 
478
    def _recv_tuple(self):
 
479
        """Read a request from the client and return as a tuple.
 
480
        
 
481
        Returns None at end of file (if the client closed the connection.)
 
482
        """
 
483
        # ** Deserialise and read bytes
 
484
        return _recv_tuple(self._in)
 
485
 
 
486
    def _send_tuple(self, args):
 
487
        """Send response header"""
 
488
        # ** serialise and write bytes
 
489
        return self._write_and_flush(_encode_tuple(args))
 
490
 
 
491
    def _send_error_and_disconnect(self, exception):
 
492
        # ** serialise and write bytes
 
493
        self._send_tuple(('error', str(exception)))
 
494
        ## self._out.close()
 
495
        ## self._in.close()
 
496
 
 
497
    def _serve_one_request(self):
 
498
        """Read one request from input, process, send back a response.
 
499
        
 
500
        :return: False if the server should terminate, otherwise None.
 
501
        """
 
502
        # ** deserialise, read bytes, serialise and write bytes
 
503
        req_line = self._in.readline()
 
504
        # this should just test "req_line == ''", surely?  -- Andrew Bennetts
 
505
        if req_line in ('', None):
 
506
            # client closed connection
 
507
            return False  # shutdown server
 
508
        try:
 
509
            protocol = SmartServerRequestProtocolOne(self._out,
 
510
                self.backing_transport)
 
511
            protocol.accept_bytes(req_line)
 
512
            if not protocol.finished_reading:
 
513
                # this boils down to readline which wont block on open sockets
 
514
                # without data. We should really though read as much as is
 
515
                # available and then hand to that accept_bytes without this
 
516
                # silly double-decode.
 
517
                bulk = self._recv_bulk()
 
518
                bulk_bytes = ''.join(('%d\n' % len(bulk), bulk, 'done\n'))
 
519
                protocol.accept_bytes(bulk_bytes)
 
520
                # might be nice to do protocol.end_of_bytes()
 
521
                # because self._recv_bulk reads all the bytes, this must finish
 
522
                # after one delivery of data rather than looping.
 
523
                assert protocol.finished_reading
 
524
        except KeyboardInterrupt:
 
525
            raise
 
526
        except Exception, e:
 
527
            # everything else: pass to client, flush, and quit
 
528
            self._send_error_and_disconnect(e)
 
529
            return False
 
530
 
 
531
    def serve(self):
 
532
        """Serve requests until the client disconnects."""
 
533
        # Keep a reference to stderr because the sys module's globals get set to
 
534
        # None during interpreter shutdown.
 
535
        from sys import stderr
 
536
        try:
 
537
            while self._serve_one_request() != False:
 
538
                pass
 
539
        except Exception, e:
 
540
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
541
            raise
 
542
 
 
543
 
 
544
class SmartServerResponse(object):
 
545
    """Response generated by SmartServerRequestHandler."""
 
546
 
 
547
    def __init__(self, args, body=None):
 
548
        self.args = args
 
549
        self.body = body
 
550
 
 
551
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
552
# for delivering the data for a request. This could be done with as the
 
553
# StreamServer, though that would create conflation between request and response
 
554
# which may be undesirable.
 
555
 
 
556
 
 
557
class SmartServerRequestHandler(object):
 
558
    """Protocol logic for smart server.
 
559
    
 
560
    This doesn't handle serialization at all, it just processes requests and
 
561
    creates responses.
 
562
    """
 
563
 
 
564
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
565
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
566
    # from the object protocol: we will want to tweak the wire protocol separate
 
567
    # from the object model, and ideally we will be able to do that without
 
568
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
569
    # just a Protocol subclass.
 
570
 
 
571
    # TODO: Better way of representing the body for commands that take it,
 
572
    # and allow it to be streamed into the server.
 
573
    
 
574
    def __init__(self, backing_transport):
 
575
        self._backing_transport = backing_transport
 
576
        self._converted_command = False
 
577
        self.finished_reading = False
 
578
        self._body_bytes = ''
 
579
        self.response = None
 
580
 
 
581
    def accept_body(self, bytes):
 
582
        """Accept body data.
 
583
 
 
584
        This should be overriden for each command that desired body data to
 
585
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
586
 
 
587
        The deserialisation into that format should be done in the Protocol
 
588
        object. Set self.desired_body_format to the format your method will
 
589
        handle.
 
590
        """
 
591
        # default fallback is to accumulate bytes.
 
592
        self._body_bytes += bytes
 
593
        
 
594
    def _end_of_body_handler(self):
 
595
        """An unimplemented end of body handler."""
 
596
        raise NotImplementedError(self._end_of_body_handler)
 
597
        
 
598
    def do_hello(self):
 
599
        """Answer a version request with my version."""
 
600
        return SmartServerResponse(('ok', '1'))
 
601
 
 
602
    def do_has(self, relpath):
 
603
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
604
        return SmartServerResponse((r,))
 
605
 
 
606
    def do_get(self, relpath):
 
607
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
608
        return SmartServerResponse(('ok',), backing_bytes)
 
609
 
 
610
    def _deserialise_optional_mode(self, mode):
 
611
        # XXX: FIXME this should be on the protocol object.
 
612
        if mode == '':
 
613
            return None
 
614
        else:
 
615
            return int(mode)
 
616
 
 
617
    def do_append(self, relpath, mode):
 
618
        self._converted_command = True
 
619
        self._relpath = relpath
 
620
        self._mode = self._deserialise_optional_mode(mode)
 
621
        self._end_of_body_handler = self._handle_do_append_end
 
622
    
 
623
    def _handle_do_append_end(self):
 
624
        old_length = self._backing_transport.append_bytes(
 
625
            self._relpath, self._body_bytes, self._mode)
 
626
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
627
 
 
628
    def do_delete(self, relpath):
 
629
        self._backing_transport.delete(relpath)
 
630
 
 
631
    def do_iter_files_recursive(self, abspath):
 
632
        # XXX: the path handling needs some thought.
 
633
        #relpath = self._backing_transport.relpath(abspath)
 
634
        transport = self._backing_transport.clone(abspath)
 
635
        filenames = transport.iter_files_recursive()
 
636
        return SmartServerResponse(('names',) + tuple(filenames))
 
637
 
 
638
    def do_list_dir(self, relpath):
 
639
        filenames = self._backing_transport.list_dir(relpath)
 
640
        return SmartServerResponse(('names',) + tuple(filenames))
 
641
 
 
642
    def do_mkdir(self, relpath, mode):
 
643
        self._backing_transport.mkdir(relpath,
 
644
                                      self._deserialise_optional_mode(mode))
 
645
 
 
646
    def do_move(self, rel_from, rel_to):
 
647
        self._backing_transport.move(rel_from, rel_to)
 
648
 
 
649
    def do_put(self, relpath, mode):
 
650
        self._converted_command = True
 
651
        self._relpath = relpath
 
652
        self._mode = self._deserialise_optional_mode(mode)
 
653
        self._end_of_body_handler = self._handle_do_put
 
654
 
 
655
    def _handle_do_put(self):
 
656
        self._backing_transport.put_bytes(self._relpath,
 
657
                self._body_bytes, self._mode)
 
658
        self.response = SmartServerResponse(('ok',))
 
659
 
 
660
    def _deserialise_offsets(self, text):
 
661
        # XXX: FIXME this should be on the protocol object.
 
662
        offsets = []
 
663
        for line in text.split('\n'):
 
664
            if not line:
 
665
                continue
 
666
            start, length = line.split(',')
 
667
            offsets.append((int(start), int(length)))
 
668
        return offsets
 
669
 
 
670
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
671
        self._converted_command = True
 
672
        self._end_of_body_handler = self._handle_put_non_atomic
 
673
        self._relpath = relpath
 
674
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
675
        self._mode = self._deserialise_optional_mode(mode)
 
676
        # a boolean would be nicer XXX
 
677
        self._create_parent = (create_parent == 'T')
 
678
 
 
679
    def _handle_put_non_atomic(self):
 
680
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
681
                self._body_bytes,
 
682
                mode=self._mode,
 
683
                create_parent_dir=self._create_parent,
 
684
                dir_mode=self._dir_mode)
 
685
        self.response = SmartServerResponse(('ok',))
 
686
 
 
687
    def do_readv(self, relpath):
 
688
        self._converted_command = True
 
689
        self._end_of_body_handler = self._handle_readv_offsets
 
690
        self._relpath = relpath
 
691
 
 
692
    def end_of_body(self):
 
693
        """No more body data will be received."""
 
694
        self._run_handler_code(self._end_of_body_handler, (), {})
 
695
        # cannot read after this.
 
696
        self.finished_reading = True
 
697
 
 
698
    def _handle_readv_offsets(self):
 
699
        """accept offsets for a readv request."""
 
700
        offsets = self._deserialise_offsets(self._body_bytes)
 
701
        backing_bytes = ''.join(bytes for offset, bytes in
 
702
            self._backing_transport.readv(self._relpath, offsets))
 
703
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
704
        
 
705
    def do_rename(self, rel_from, rel_to):
 
706
        self._backing_transport.rename(rel_from, rel_to)
 
707
 
 
708
    def do_rmdir(self, relpath):
 
709
        self._backing_transport.rmdir(relpath)
 
710
 
 
711
    def do_stat(self, relpath):
 
712
        stat = self._backing_transport.stat(relpath)
 
713
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
714
        
 
715
    def do_get_bundle(self, path, revision_id):
 
716
        # open transport relative to our base
 
717
        t = self._backing_transport.clone(path)
 
718
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
719
        repo = control.open_repository()
 
720
        tmpf = tempfile.TemporaryFile()
 
721
        base_revision = revision.NULL_REVISION
 
722
        write_bundle(repo, revision_id, base_revision, tmpf)
 
723
        tmpf.seek(0)
 
724
        return SmartServerResponse((), tmpf.read())
 
725
 
 
726
    def dispatch_command(self, cmd, args):
 
727
        """Deprecated compatibility method.""" # XXX XXX
 
728
        func = getattr(self, 'do_' + cmd, None)
 
729
        if func is None:
 
730
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
731
        self._run_handler_code(func, args, {})
 
732
 
 
733
    def _run_handler_code(self, callable, args, kwargs):
 
734
        """Run some handler specific code 'callable'.
 
735
 
 
736
        If a result is returned, it is considered to be the commands response,
 
737
        and finished_reading is set true, and its assigned to self.response.
 
738
 
 
739
        Any exceptions caught are translated and a response object created
 
740
        from them.
 
741
        """
 
742
        result = self._call_converting_errors(callable, args, kwargs)
 
743
        if result is not None:
 
744
            self.response = result
 
745
            self.finished_reading = True
 
746
        # handle unconverted commands
 
747
        if not self._converted_command:
 
748
            self.finished_reading = True
 
749
            if result is None:
 
750
                self.response = SmartServerResponse(('ok',))
 
751
 
 
752
    def _call_converting_errors(self, callable, args, kwargs):
 
753
        """Call callable converting errors to Response objects."""
 
754
        try:
 
755
            return callable(*args, **kwargs)
 
756
        except errors.NoSuchFile, e:
 
757
            return SmartServerResponse(('NoSuchFile', e.path))
 
758
        except errors.FileExists, e:
 
759
            return SmartServerResponse(('FileExists', e.path))
 
760
        except errors.DirectoryNotEmpty, e:
 
761
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
762
        except errors.ShortReadvError, e:
 
763
            return SmartServerResponse(('ShortReadvError',
 
764
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
765
        except UnicodeError, e:
 
766
            # If it is a DecodeError, than most likely we are starting
 
767
            # with a plain string
 
768
            str_or_unicode = e.object
 
769
            if isinstance(str_or_unicode, unicode):
 
770
                val = u'u:' + str_or_unicode
 
771
            else:
 
772
                val = u's:' + str_or_unicode.encode('base64')
 
773
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
774
            return SmartServerResponse((e.__class__.__name__,
 
775
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
776
        except errors.TransportNotPossible, e:
 
777
            if e.msg == "readonly transport":
 
778
                return SmartServerResponse(('ReadOnlyError', ))
 
779
            else:
 
780
                raise
 
781
 
 
782
 
 
783
class SmartTCPServer(object):
 
784
    """Listens on a TCP socket and accepts connections from smart clients"""
 
785
 
 
786
    def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
 
787
        """Construct a new server.
 
788
 
 
789
        To actually start it running, call either start_background_thread or
 
790
        serve.
 
791
 
 
792
        :param host: Name of the interface to listen on.
 
793
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
794
        """
 
795
        if backing_transport is None:
 
796
            backing_transport = memory.MemoryTransport()
 
797
        self._server_socket = socket.socket()
 
798
        self._server_socket.bind((host, port))
 
799
        self.port = self._server_socket.getsockname()[1]
 
800
        self._server_socket.listen(1)
 
801
        self._server_socket.settimeout(1)
 
802
        self.backing_transport = backing_transport
 
803
 
 
804
    def serve(self):
 
805
        # let connections timeout so that we get a chance to terminate
 
806
        # Keep a reference to the exceptions we want to catch because the socket
 
807
        # module's globals get set to None during interpreter shutdown.
 
808
        from socket import timeout as socket_timeout
 
809
        from socket import error as socket_error
 
810
        self._should_terminate = False
 
811
        while not self._should_terminate:
 
812
            try:
 
813
                self.accept_and_serve()
 
814
            except socket_timeout:
 
815
                # just check if we're asked to stop
 
816
                pass
 
817
            except socket_error, e:
 
818
                trace.warning("client disconnected: %s", e)
 
819
                pass
 
820
 
 
821
    def get_url(self):
 
822
        """Return the url of the server"""
 
823
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
824
 
 
825
    def accept_and_serve(self):
 
826
        conn, client_addr = self._server_socket.accept()
 
827
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
828
        from_client = conn.makefile('r')
 
829
        to_client = conn.makefile('w')
 
830
        handler = SmartServerStreamMedium(from_client, to_client,
 
831
                self.backing_transport)
 
832
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
833
        connection_thread.setDaemon(True)
 
834
        connection_thread.start()
 
835
 
 
836
    def start_background_thread(self):
 
837
        self._server_thread = threading.Thread(None,
 
838
                self.serve,
 
839
                name='server-' + self.get_url())
 
840
        self._server_thread.setDaemon(True)
 
841
        self._server_thread.start()
 
842
 
 
843
    def stop_background_thread(self):
 
844
        self._should_terminate = True
 
845
        # self._server_socket.close()
 
846
        # we used to join the thread, but it's not really necessary; it will
 
847
        # terminate in time
 
848
        ## self._server_thread.join()
 
849
 
 
850
 
 
851
class SmartTCPServer_for_testing(SmartTCPServer):
 
852
    """Server suitable for use by transport tests.
 
853
    
 
854
    This server is backed by the process's cwd.
 
855
    """
 
856
 
 
857
    def __init__(self):
 
858
        self._homedir = os.getcwd()
 
859
        # The server is set up by default like for ssh access: the client
 
860
        # passes filesystem-absolute paths; therefore the server must look
 
861
        # them up relative to the root directory.  it might be better to act
 
862
        # a public server and have the server rewrite paths into the test
 
863
        # directory.
 
864
        SmartTCPServer.__init__(self, transport.get_transport("file:///"))
 
865
        
 
866
    def setUp(self):
 
867
        """Set up server for testing"""
 
868
        self.start_background_thread()
 
869
 
 
870
    def tearDown(self):
 
871
        self.stop_background_thread()
 
872
 
 
873
    def get_url(self):
 
874
        """Return the url of the server"""
 
875
        host, port = self._server_socket.getsockname()
 
876
        # XXX: I think this is likely to break on windows -- self._homedir will
 
877
        # have backslashes (and maybe a drive letter?).
 
878
        #  -- Andrew Bennetts, 2006-08-29
 
879
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
880
 
 
881
    def get_bogus_url(self):
 
882
        """Return a URL which will fail to connect"""
 
883
        return 'bzr://127.0.0.1:1/'
 
884
 
 
885
 
 
886
class SmartStat(object):
 
887
 
 
888
    def __init__(self, size, mode):
 
889
        self.st_size = size
 
890
        self.st_mode = mode
 
891
 
 
892
 
 
893
class SmartTransport(transport.Transport):
 
894
    """Connection to a smart server.
 
895
 
 
896
    The connection holds references to pipes that can be used to send requests
 
897
    to the server.
 
898
 
 
899
    The connection has a notion of the current directory to which it's
 
900
    connected; this is incorporated in filenames passed to the server.
 
901
    
 
902
    This supports some higher-level RPC operations and can also be treated 
 
903
    like a Transport to do file-like operations.
 
904
 
 
905
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
906
    or a series of http requests.  There are concrete subclasses for each
 
907
    type: SmartTCPTransport, etc.
 
908
    """
 
909
 
 
910
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
911
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
912
    # the ability to support multiple versions of the smart protocol over time:
 
913
    # SmartTransport is an adapter from the Transport object model to the 
 
914
    # SmartClient model, not an encoder.
 
915
 
 
916
    def __init__(self, url, clone_from=None, medium=None):
 
917
        """Constructor.
 
918
 
 
919
        :param medium: The medium to use for this RemoteTransport. This must be
 
920
            supplied if clone_from is None.
 
921
        """
 
922
        ### Technically super() here is faulty because Transport's __init__
 
923
        ### fails to take 2 parameters, and if super were to choose a silly
 
924
        ### initialisation order things would blow up. 
 
925
        if not url.endswith('/'):
 
926
            url += '/'
 
927
        super(SmartTransport, self).__init__(url)
 
928
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
929
                transport.split_url(url)
 
930
        if clone_from is None:
 
931
            self._medium = medium
 
932
        else:
 
933
            # credentials may be stripped from the base in some circumstances
 
934
            # as yet to be clearly defined or documented, so copy them.
 
935
            self._username = clone_from._username
 
936
            # reuse same connection
 
937
            self._medium = clone_from._medium
 
938
        assert self._medium is not None
 
939
 
 
940
    def abspath(self, relpath):
 
941
        """Return the full url to the given relative path.
 
942
        
 
943
        @param relpath: the relative path or path components
 
944
        @type relpath: str or list
 
945
        """
 
946
        return self._unparse_url(self._remote_path(relpath))
 
947
    
 
948
    def clone(self, relative_url):
 
949
        """Make a new SmartTransport related to me, sharing the same connection.
 
950
 
 
951
        This essentially opens a handle on a different remote directory.
 
952
        """
 
953
        if relative_url is None:
 
954
            return SmartTransport(self.base, self)
 
955
        else:
 
956
            return SmartTransport(self.abspath(relative_url), self)
 
957
 
 
958
    def is_readonly(self):
 
959
        """Smart server transport can do read/write file operations."""
 
960
        return False
 
961
                                                   
 
962
    def get_smart_client(self):
 
963
        return self._medium
 
964
 
 
965
    def get_smart_medium(self):
 
966
        return self._medium
 
967
                                                   
 
968
    def _unparse_url(self, path):
 
969
        """Return URL for a path.
 
970
 
 
971
        :see: SFTPUrlHandling._unparse_url
 
972
        """
 
973
        # TODO: Eventually it should be possible to unify this with
 
974
        # SFTPUrlHandling._unparse_url?
 
975
        if path == '':
 
976
            path = '/'
 
977
        path = urllib.quote(path)
 
978
        netloc = urllib.quote(self._host)
 
979
        if self._username is not None:
 
980
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
981
        if self._port is not None:
 
982
            netloc = '%s:%d' % (netloc, self._port)
 
983
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
984
 
 
985
    def _remote_path(self, relpath):
 
986
        """Returns the Unicode version of the absolute path for relpath."""
 
987
        return self._combine_paths(self._path, relpath)
 
988
 
 
989
    def _call(self, method, *args):
 
990
        resp = self._call2(method, *args)
 
991
        self._translate_error(resp)
 
992
 
 
993
    def _call2(self, method, *args):
 
994
        """Call a method on the remote server."""
 
995
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
996
        protocol.call(method, *args)
 
997
        return protocol.read_response_tuple()
 
998
 
 
999
    def _call_with_body_bytes(self, method, args, body):
 
1000
        """Call a method on the remote server with body bytes."""
 
1001
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1002
        protocol.call_with_body_bytes((method, ) + args, body)
 
1003
        return protocol.read_response_tuple()
 
1004
 
 
1005
    def has(self, relpath):
 
1006
        """Indicate whether a remote file of the given name exists or not.
 
1007
 
 
1008
        :see: Transport.has()
 
1009
        """
 
1010
        resp = self._call2('has', self._remote_path(relpath))
 
1011
        if resp == ('yes', ):
 
1012
            return True
 
1013
        elif resp == ('no', ):
 
1014
            return False
 
1015
        else:
 
1016
            self._translate_error(resp)
 
1017
 
 
1018
    def get(self, relpath):
 
1019
        """Return file-like object reading the contents of a remote file.
 
1020
        
 
1021
        :see: Transport.get_bytes()/get_file()
 
1022
        """
 
1023
        return StringIO(self.get_bytes(relpath))
 
1024
 
 
1025
    def get_bytes(self, relpath):
 
1026
        remote = self._remote_path(relpath)
 
1027
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1028
        protocol.call('get', remote)
 
1029
        resp = protocol.read_response_tuple(True)
 
1030
        if resp != ('ok', ):
 
1031
            protocol.cancel_read_body()
 
1032
            self._translate_error(resp, relpath)
 
1033
        return protocol.read_body_bytes()
 
1034
 
 
1035
    def _serialise_optional_mode(self, mode):
 
1036
        if mode is None:
 
1037
            return ''
 
1038
        else:
 
1039
            return '%d' % mode
 
1040
 
 
1041
    def mkdir(self, relpath, mode=None):
 
1042
        resp = self._call2('mkdir', self._remote_path(relpath),
 
1043
            self._serialise_optional_mode(mode))
 
1044
        self._translate_error(resp)
 
1045
 
 
1046
    def put_bytes(self, relpath, upload_contents, mode=None):
 
1047
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
1048
        # should probably just pass all parameters as length-delimited
 
1049
        # strings?
 
1050
        resp = self._call_with_body_bytes('put',
 
1051
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1052
            upload_contents)
 
1053
        self._translate_error(resp)
 
1054
 
 
1055
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
1056
                             create_parent_dir=False,
 
1057
                             dir_mode=None):
 
1058
        """See Transport.put_bytes_non_atomic."""
 
1059
        # FIXME: no encoding in the transport!
 
1060
        create_parent_str = 'F'
 
1061
        if create_parent_dir:
 
1062
            create_parent_str = 'T'
 
1063
 
 
1064
        resp = self._call_with_body_bytes(
 
1065
            'put_non_atomic',
 
1066
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
1067
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
1068
            bytes)
 
1069
        self._translate_error(resp)
 
1070
 
 
1071
    def put_file(self, relpath, upload_file, mode=None):
 
1072
        # its not ideal to seek back, but currently put_non_atomic_file depends
 
1073
        # on transports not reading before failing - which is a faulty
 
1074
        # assumption I think - RBC 20060915
 
1075
        pos = upload_file.tell()
 
1076
        try:
 
1077
            return self.put_bytes(relpath, upload_file.read(), mode)
 
1078
        except:
 
1079
            upload_file.seek(pos)
 
1080
            raise
 
1081
 
 
1082
    def put_file_non_atomic(self, relpath, f, mode=None,
 
1083
                            create_parent_dir=False,
 
1084
                            dir_mode=None):
 
1085
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
1086
                                         create_parent_dir=create_parent_dir,
 
1087
                                         dir_mode=dir_mode)
 
1088
 
 
1089
    def append_file(self, relpath, from_file, mode=None):
 
1090
        return self.append_bytes(relpath, from_file.read(), mode)
 
1091
        
 
1092
    def append_bytes(self, relpath, bytes, mode=None):
 
1093
        resp = self._call_with_body_bytes(
 
1094
            'append',
 
1095
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1096
            bytes)
 
1097
        if resp[0] == 'appended':
 
1098
            return int(resp[1])
 
1099
        self._translate_error(resp)
 
1100
 
 
1101
    def delete(self, relpath):
 
1102
        resp = self._call2('delete', self._remote_path(relpath))
 
1103
        self._translate_error(resp)
 
1104
 
 
1105
    def readv(self, relpath, offsets):
 
1106
        if not offsets:
 
1107
            return
 
1108
 
 
1109
        offsets = list(offsets)
 
1110
 
 
1111
        sorted_offsets = sorted(offsets)
 
1112
        # turn the list of offsets into a stack
 
1113
        offset_stack = iter(offsets)
 
1114
        cur_offset_and_size = offset_stack.next()
 
1115
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
1116
                               limit=self._max_readv_combine,
 
1117
                               fudge_factor=self._bytes_to_read_before_seek))
 
1118
 
 
1119
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1120
        protocol.call_with_body_readv_array(
 
1121
            ('readv', self._remote_path(relpath)),
 
1122
            [(c.start, c.length) for c in coalesced])
 
1123
        resp = protocol.read_response_tuple(True)
 
1124
 
 
1125
        if resp[0] != 'readv':
 
1126
            # This should raise an exception
 
1127
            protocol.cancel_read_body()
 
1128
            self._translate_error(resp)
 
1129
            return
 
1130
 
 
1131
        # FIXME: this should know how many bytes are needed, for clarity.
 
1132
        data = protocol.read_body_bytes()
 
1133
        # Cache the results, but only until they have been fulfilled
 
1134
        data_map = {}
 
1135
        for c_offset in coalesced:
 
1136
            if len(data) < c_offset.length:
 
1137
                raise errors.ShortReadvError(relpath, c_offset.start,
 
1138
                            c_offset.length, actual=len(data))
 
1139
            for suboffset, subsize in c_offset.ranges:
 
1140
                key = (c_offset.start+suboffset, subsize)
 
1141
                data_map[key] = data[suboffset:suboffset+subsize]
 
1142
            data = data[c_offset.length:]
 
1143
 
 
1144
            # Now that we've read some data, see if we can yield anything back
 
1145
            while cur_offset_and_size in data_map:
 
1146
                this_data = data_map.pop(cur_offset_and_size)
 
1147
                yield cur_offset_and_size[0], this_data
 
1148
                cur_offset_and_size = offset_stack.next()
 
1149
 
 
1150
    def rename(self, rel_from, rel_to):
 
1151
        self._call('rename',
 
1152
                   self._remote_path(rel_from),
 
1153
                   self._remote_path(rel_to))
 
1154
 
 
1155
    def move(self, rel_from, rel_to):
 
1156
        self._call('move',
 
1157
                   self._remote_path(rel_from),
 
1158
                   self._remote_path(rel_to))
 
1159
 
 
1160
    def rmdir(self, relpath):
 
1161
        resp = self._call('rmdir', self._remote_path(relpath))
 
1162
 
 
1163
    def _translate_error(self, resp, orig_path=None):
 
1164
        """Raise an exception from a response"""
 
1165
        if resp is None:
 
1166
            what = None
 
1167
        else:
 
1168
            what = resp[0]
 
1169
        if what == 'ok':
 
1170
            return
 
1171
        elif what == 'NoSuchFile':
 
1172
            if orig_path is not None:
 
1173
                error_path = orig_path
 
1174
            else:
 
1175
                error_path = resp[1]
 
1176
            raise errors.NoSuchFile(error_path)
 
1177
        elif what == 'error':
 
1178
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1179
        elif what == 'FileExists':
 
1180
            raise errors.FileExists(resp[1])
 
1181
        elif what == 'DirectoryNotEmpty':
 
1182
            raise errors.DirectoryNotEmpty(resp[1])
 
1183
        elif what == 'ShortReadvError':
 
1184
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1185
                                         int(resp[3]), int(resp[4]))
 
1186
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1187
            encoding = str(resp[1]) # encoding must always be a string
 
1188
            val = resp[2]
 
1189
            start = int(resp[3])
 
1190
            end = int(resp[4])
 
1191
            reason = str(resp[5]) # reason must always be a string
 
1192
            if val.startswith('u:'):
 
1193
                val = val[2:]
 
1194
            elif val.startswith('s:'):
 
1195
                val = val[2:].decode('base64')
 
1196
            if what == 'UnicodeDecodeError':
 
1197
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1198
            elif what == 'UnicodeEncodeError':
 
1199
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1200
        elif what == "ReadOnlyError":
 
1201
            raise errors.TransportNotPossible('readonly transport')
 
1202
        else:
 
1203
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
1204
 
 
1205
    def disconnect(self):
 
1206
        self._medium.disconnect()
 
1207
 
 
1208
    def delete_tree(self, relpath):
 
1209
        raise errors.TransportNotPossible('readonly transport')
 
1210
 
 
1211
    def stat(self, relpath):
 
1212
        resp = self._call2('stat', self._remote_path(relpath))
 
1213
        if resp[0] == 'stat':
 
1214
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
1215
        else:
 
1216
            self._translate_error(resp)
 
1217
 
 
1218
    ## def lock_read(self, relpath):
 
1219
    ##     """Lock the given file for shared (read) access.
 
1220
    ##     :return: A lock object, which should be passed to Transport.unlock()
 
1221
    ##     """
 
1222
    ##     # The old RemoteBranch ignore lock for reading, so we will
 
1223
    ##     # continue that tradition and return a bogus lock object.
 
1224
    ##     class BogusLock(object):
 
1225
    ##         def __init__(self, path):
 
1226
    ##             self.path = path
 
1227
    ##         def unlock(self):
 
1228
    ##             pass
 
1229
    ##     return BogusLock(relpath)
 
1230
 
 
1231
    def listable(self):
 
1232
        return True
 
1233
 
 
1234
    def list_dir(self, relpath):
 
1235
        resp = self._call2('list_dir', self._remote_path(relpath))
 
1236
        if resp[0] == 'names':
 
1237
            return [name.encode('ascii') for name in resp[1:]]
 
1238
        else:
 
1239
            self._translate_error(resp)
 
1240
 
 
1241
    def iter_files_recursive(self):
 
1242
        resp = self._call2('iter_files_recursive', self._remote_path(''))
 
1243
        if resp[0] == 'names':
 
1244
            return resp[1:]
 
1245
        else:
 
1246
            self._translate_error(resp)
 
1247
 
 
1248
 
 
1249
class SmartClientMediumRequest(object):
 
1250
    """A request on a SmartClientMedium.
 
1251
 
 
1252
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1253
    the response bytes to be read via read_bytes.
 
1254
 
 
1255
    For instance:
 
1256
    request.accept_bytes('123')
 
1257
    request.finished_writing()
 
1258
    result = request.read_bytes(3)
 
1259
    request.finished_reading()
 
1260
 
 
1261
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1262
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1263
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1264
    details on concurrency and pipelining.
 
1265
    """
 
1266
 
 
1267
    def __init__(self, medium):
 
1268
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1269
        self._medium = medium
 
1270
        # we track state by constants - we may want to use the same
 
1271
        # pattern as BodyReader if it gets more complex.
 
1272
        # valid states are: "writing", "reading", "done"
 
1273
        self._state = "writing"
 
1274
 
 
1275
    def accept_bytes(self, bytes):
 
1276
        """Accept bytes for inclusion in this request.
 
1277
 
 
1278
        This method may not be be called after finished_writing() has been
 
1279
        called.  It depends upon the Medium whether or not the bytes will be
 
1280
        immediately transmitted. Message based Mediums will tend to buffer the
 
1281
        bytes until finished_writing() is called.
 
1282
 
 
1283
        :param bytes: A bytestring.
 
1284
        """
 
1285
        if self._state != "writing":
 
1286
            raise errors.WritingCompleted(self)
 
1287
        self._accept_bytes(bytes)
 
1288
 
 
1289
    def _accept_bytes(self, bytes):
 
1290
        """Helper for accept_bytes.
 
1291
 
 
1292
        Accept_bytes checks the state of the request to determing if bytes
 
1293
        should be accepted. After that it hands off to _accept_bytes to do the
 
1294
        actual acceptance.
 
1295
        """
 
1296
        raise NotImplementedError(self._accept_bytes)
 
1297
 
 
1298
    def finished_reading(self):
 
1299
        """Inform the request that all desired data has been read.
 
1300
 
 
1301
        This will remove the request from the pipeline for its medium (if the
 
1302
        medium supports pipelining) and any further calls to methods on the
 
1303
        request will raise ReadingCompleted.
 
1304
        """
 
1305
        if self._state == "writing":
 
1306
            raise errors.WritingNotComplete(self)
 
1307
        if self._state != "reading":
 
1308
            raise errors.ReadingCompleted(self)
 
1309
        self._state = "done"
 
1310
        self._finished_reading()
 
1311
 
 
1312
    def _finished_reading(self):
 
1313
        """Helper for finished_reading.
 
1314
 
 
1315
        finished_reading checks the state of the request to determine if 
 
1316
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1317
        to perform the action.
 
1318
        """
 
1319
        raise NotImplementedError(self._finished_reading)
 
1320
 
 
1321
    def finished_writing(self):
 
1322
        """Finish the writing phase of this request.
 
1323
 
 
1324
        This will flush all pending data for this request along the medium.
 
1325
        After calling finished_writing, you may not call accept_bytes anymore.
 
1326
        """
 
1327
        if self._state != "writing":
 
1328
            raise errors.WritingCompleted(self)
 
1329
        self._state = "reading"
 
1330
        self._finished_writing()
 
1331
 
 
1332
    def _finished_writing(self):
 
1333
        """Helper for finished_writing.
 
1334
 
 
1335
        finished_writing checks the state of the request to determine if 
 
1336
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1337
        to perform the action.
 
1338
        """
 
1339
        raise NotImplementedError(self._finished_writing)
 
1340
 
 
1341
    def read_bytes(self, count):
 
1342
        """Read bytes from this requests response.
 
1343
 
 
1344
        This method will block and wait for count bytes to be read. It may not
 
1345
        be invoked until finished_writing() has been called - this is to ensure
 
1346
        a message-based approach to requests, for compatability with message
 
1347
        based mediums like HTTP.
 
1348
        """
 
1349
        if self._state == "writing":
 
1350
            raise errors.WritingNotComplete(self)
 
1351
        if self._state != "reading":
 
1352
            raise errors.ReadingCompleted(self)
 
1353
        return self._read_bytes(count)
 
1354
 
 
1355
    def _read_bytes(self, count):
 
1356
        """Helper for read_bytes.
 
1357
 
 
1358
        read_bytes checks the state of the request to determing if bytes
 
1359
        should be read. After that it hands off to _read_bytes to do the
 
1360
        actual read.
 
1361
        """
 
1362
        raise NotImplementedError(self._read_bytes)
 
1363
 
 
1364
 
 
1365
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1366
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1367
 
 
1368
    def __init__(self, medium):
 
1369
        SmartClientMediumRequest.__init__(self, medium)
 
1370
        # check that we are safe concurrency wise. If some streams start
 
1371
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1372
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1373
        # and the setting/unsetting of _current_request likewise moved into
 
1374
        # that class : but its unneeded overhead for now. RBC 20060922
 
1375
        if self._medium._current_request is not None:
 
1376
            raise errors.TooManyConcurrentRequests(self._medium)
 
1377
        self._medium._current_request = self
 
1378
 
 
1379
    def _accept_bytes(self, bytes):
 
1380
        """See SmartClientMediumRequest._accept_bytes.
 
1381
        
 
1382
        This forwards to self._medium._accept_bytes because we are operating
 
1383
        on the mediums stream.
 
1384
        """
 
1385
        self._medium._accept_bytes(bytes)
 
1386
 
 
1387
    def _finished_reading(self):
 
1388
        """See SmartClientMediumRequest._finished_reading.
 
1389
 
 
1390
        This clears the _current_request on self._medium to allow a new 
 
1391
        request to be created.
 
1392
        """
 
1393
        assert self._medium._current_request is self
 
1394
        self._medium._current_request = None
 
1395
        
 
1396
    def _finished_writing(self):
 
1397
        """See SmartClientMediumRequest._finished_writing.
 
1398
 
 
1399
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1400
        """
 
1401
        self._medium._flush()
 
1402
 
 
1403
    def _read_bytes(self, count):
 
1404
        """See SmartClientMediumRequest._read_bytes.
 
1405
        
 
1406
        This forwards to self._medium._read_bytes because we are operating
 
1407
        on the mediums stream.
 
1408
        """
 
1409
        return self._medium._read_bytes(count)
 
1410
 
 
1411
 
 
1412
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1413
    """The client-side protocol for smart version 1."""
 
1414
 
 
1415
    def __init__(self, request):
 
1416
        """Construct a SmartClientRequestProtocolOne.
 
1417
 
 
1418
        :param request: A SmartClientMediumRequest to serialise onto and
 
1419
            deserialise from.
 
1420
        """
 
1421
        self._request = request
 
1422
        self._body_buffer = None
 
1423
 
 
1424
    def call(self, *args):
 
1425
        bytes = _encode_tuple(args)
 
1426
        self._request.accept_bytes(bytes)
 
1427
        self._request.finished_writing()
 
1428
 
 
1429
    def call_with_body_bytes(self, args, body):
 
1430
        """Make a remote call of args with body bytes 'body'.
 
1431
 
 
1432
        After calling this, call read_response_tuple to find the result out.
 
1433
        """
 
1434
        bytes = _encode_tuple(args)
 
1435
        self._request.accept_bytes(bytes)
 
1436
        bytes = self._encode_bulk_data(body)
 
1437
        self._request.accept_bytes(bytes)
 
1438
        self._request.finished_writing()
 
1439
 
 
1440
    def call_with_body_readv_array(self, args, body):
 
1441
        """Make a remote call with a readv array.
 
1442
 
 
1443
        The body is encoded with one line per readv offset pair. The numbers in
 
1444
        each pair are separated by a comma, and no trailing \n is emitted.
 
1445
        """
 
1446
        bytes = _encode_tuple(args)
 
1447
        self._request.accept_bytes(bytes)
 
1448
        readv_bytes = self._serialise_offsets(body)
 
1449
        bytes = self._encode_bulk_data(readv_bytes)
 
1450
        self._request.accept_bytes(bytes)
 
1451
        self._request.finished_writing()
 
1452
 
 
1453
    def cancel_read_body(self):
 
1454
        """After expecting a body, a response code may indicate one otherwise.
 
1455
 
 
1456
        This method lets the domain client inform the protocol that no body
 
1457
        will be transmitted. This is a terminal method: after calling it the
 
1458
        protocol is not able to be used further.
 
1459
        """
 
1460
        self._request.finished_reading()
 
1461
 
 
1462
    def read_response_tuple(self, expect_body=False):
 
1463
        """Read a response tuple from the wire.
 
1464
 
 
1465
        This should only be called once.
 
1466
        """
 
1467
        result = self._recv_tuple()
 
1468
        if not expect_body:
 
1469
            self._request.finished_reading()
 
1470
        return result
 
1471
 
 
1472
    def read_body_bytes(self, count=-1):
 
1473
        """Read bytes from the body, decoding into a byte stream.
 
1474
        
 
1475
        We read all bytes at once to ensure we've checked the trailer for 
 
1476
        errors, and then feed the buffer back as read_body_bytes is called.
 
1477
        """
 
1478
        if self._body_buffer is not None:
 
1479
            return self._body_buffer.read(count)
 
1480
        _body_decoder = LengthPrefixedBodyDecoder()
 
1481
        # grab a byte from the wire: we do this so that we dont use too much
 
1482
        # from the wire; we should have the LengthPrefixedBodyDecoder tell
 
1483
        # us how much is needed once the header is written.
 
1484
        # i.e. self._body_decoder.next_read_size() would be a hint. 
 
1485
        while not _body_decoder.finished_reading:
 
1486
            byte = self._request.read_bytes(1)
 
1487
            _body_decoder.accept_bytes(byte)
 
1488
        self._request.finished_reading()
 
1489
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1490
        # XXX: TODO check the trailer result.
 
1491
        return self._body_buffer.read(count)
 
1492
 
 
1493
    def _recv_tuple(self):
 
1494
        """Recieve a tuple from the medium request."""
 
1495
        line = ''
 
1496
        while not line or line[-1] != '\n':
 
1497
            # yes, this is inefficient - but tuples are short.
 
1498
            new_char = self._request.read_bytes(1)
 
1499
            line += new_char
 
1500
            assert new_char != '', "end of file reading from server."
 
1501
        return _decode_tuple(line)
 
1502
 
 
1503
    def query_version(self):
 
1504
        """Return protocol version number of the server."""
 
1505
        self.call('hello')
 
1506
        resp = self.read_response_tuple()
 
1507
        if resp == ('ok', '1'):
 
1508
            return 1
 
1509
        else:
 
1510
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1511
 
 
1512
 
 
1513
class SmartClientMedium(object):
 
1514
    """Smart client is a medium for sending smart protocol requests over."""
 
1515
 
 
1516
    def disconnect(self):
 
1517
        """If this medium maintains a persistent connection, close it.
 
1518
        
 
1519
        The default implementation does nothing.
 
1520
        """
 
1521
        
 
1522
 
 
1523
class SmartClientStreamMedium(SmartClientMedium):
 
1524
    """Stream based medium common class.
 
1525
 
 
1526
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1527
    SmartClientStreamMediumRequest for their requests, and should implement
 
1528
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1529
    receive bytes.
 
1530
    """
 
1531
 
 
1532
    def __init__(self):
 
1533
        self._current_request = None
 
1534
 
 
1535
    def accept_bytes(self, bytes):
 
1536
        self._accept_bytes(bytes)
 
1537
 
 
1538
    def __del__(self):
 
1539
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1540
        finished with it.
 
1541
        """
 
1542
        self.disconnect()
 
1543
 
 
1544
    def _flush(self):
 
1545
        """Flush the output stream.
 
1546
        
 
1547
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1548
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1549
        """
 
1550
        raise NotImplementedError(self._flush)
 
1551
 
 
1552
    def get_request(self):
 
1553
        """See SmartClientMedium.get_request().
 
1554
 
 
1555
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1556
        for get_request.
 
1557
        """
 
1558
        return SmartClientStreamMediumRequest(self)
 
1559
 
 
1560
    def read_bytes(self, count):
 
1561
        return self._read_bytes(count)
 
1562
 
 
1563
 
 
1564
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1565
    """A client medium using simple pipes.
 
1566
    
 
1567
    This client does not manage the pipes: it assumes they will always be open.
 
1568
    """
 
1569
 
 
1570
    def __init__(self, readable_pipe, writeable_pipe):
 
1571
        SmartClientStreamMedium.__init__(self)
 
1572
        self._readable_pipe = readable_pipe
 
1573
        self._writeable_pipe = writeable_pipe
 
1574
 
 
1575
    def _accept_bytes(self, bytes):
 
1576
        """See SmartClientStreamMedium.accept_bytes."""
 
1577
        self._writeable_pipe.write(bytes)
 
1578
 
 
1579
    def _flush(self):
 
1580
        """See SmartClientStreamMedium._flush()."""
 
1581
        self._writeable_pipe.flush()
 
1582
 
 
1583
    def _read_bytes(self, count):
 
1584
        """See SmartClientStreamMedium._read_bytes."""
 
1585
        return self._readable_pipe.read(count)
 
1586
 
 
1587
 
 
1588
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1589
    """A client medium using SSH."""
 
1590
    
 
1591
    def __init__(self, host, port=None, username=None, password=None,
 
1592
            vendor=None):
 
1593
        """Creates a client that will connect on the first use.
 
1594
        
 
1595
        :param vendor: An optional override for the ssh vendor to use. See
 
1596
            bzrlib.transport.ssh for details on ssh vendors.
 
1597
        """
 
1598
        SmartClientStreamMedium.__init__(self)
 
1599
        self._connected = False
 
1600
        self._host = host
 
1601
        self._password = password
 
1602
        self._port = port
 
1603
        self._username = username
 
1604
        self._read_from = None
 
1605
        self._ssh_connection = None
 
1606
        self._vendor = vendor
 
1607
        self._write_to = None
 
1608
 
 
1609
    def _accept_bytes(self, bytes):
 
1610
        """See SmartClientStreamMedium.accept_bytes."""
 
1611
        self._ensure_connection()
 
1612
        self._write_to.write(bytes)
 
1613
 
 
1614
    def disconnect(self):
 
1615
        """See SmartClientMedium.disconnect()."""
 
1616
        if not self._connected:
 
1617
            return
 
1618
        self._read_from.close()
 
1619
        self._write_to.close()
 
1620
        self._ssh_connection.close()
 
1621
        self._connected = False
 
1622
 
 
1623
    def _ensure_connection(self):
 
1624
        """Connect this medium if not already connected."""
 
1625
        if self._connected:
 
1626
            return
 
1627
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1628
        if self._vendor is None:
 
1629
            vendor = ssh._get_ssh_vendor()
 
1630
        else:
 
1631
            vendor = self._vendor
 
1632
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1633
                self._password, self._host, self._port,
 
1634
                command=[executable, 'serve', '--inet', '--directory=/',
 
1635
                         '--allow-writes'])
 
1636
        self._read_from, self._write_to = \
 
1637
            self._ssh_connection.get_filelike_channels()
 
1638
        self._connected = True
 
1639
 
 
1640
    def _flush(self):
 
1641
        """See SmartClientStreamMedium._flush()."""
 
1642
        self._write_to.flush()
 
1643
 
 
1644
    def _read_bytes(self, count):
 
1645
        """See SmartClientStreamMedium.read_bytes."""
 
1646
        if not self._connected:
 
1647
            raise errors.MediumNotConnected(self)
 
1648
        return self._read_from.read(count)
 
1649
 
 
1650
 
 
1651
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1652
    """A client medium using TCP."""
 
1653
    
 
1654
    def __init__(self, host, port):
 
1655
        """Creates a client that will connect on the first use."""
 
1656
        SmartClientStreamMedium.__init__(self)
 
1657
        self._connected = False
 
1658
        self._host = host
 
1659
        self._port = port
 
1660
        self._socket = None
 
1661
 
 
1662
    def _accept_bytes(self, bytes):
 
1663
        """See SmartClientMedium.accept_bytes."""
 
1664
        self._ensure_connection()
 
1665
        self._socket.sendall(bytes)
 
1666
 
 
1667
    def disconnect(self):
 
1668
        """See SmartClientMedium.disconnect()."""
 
1669
        if not self._connected:
 
1670
            return
 
1671
        self._socket.close()
 
1672
        self._socket = None
 
1673
        self._connected = False
 
1674
 
 
1675
    def _ensure_connection(self):
 
1676
        """Connect this medium if not already connected."""
 
1677
        if self._connected:
 
1678
            return
 
1679
        self._socket = socket.socket()
 
1680
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1681
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1682
        if result:
 
1683
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1684
                    (self._host, self._port, os.strerror(result)))
 
1685
        self._connected = True
 
1686
 
 
1687
    def _flush(self):
 
1688
        """See SmartClientStreamMedium._flush().
 
1689
        
 
1690
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1691
        add a means to do a flush, but that can be done in the future.
 
1692
        """
 
1693
 
 
1694
    def _read_bytes(self, count):
 
1695
        """See SmartClientMedium.read_bytes."""
 
1696
        if not self._connected:
 
1697
            raise errors.MediumNotConnected(self)
 
1698
        return self._socket.recv(count)
 
1699
 
 
1700
 
 
1701
class SmartTCPTransport(SmartTransport):
 
1702
    """Connection to smart server over plain tcp.
 
1703
    
 
1704
    This is essentially just a factory to get 'RemoteTransport(url,
 
1705
        SmartTCPClientMedium).
 
1706
    """
 
1707
 
 
1708
    def __init__(self, url):
 
1709
        _scheme, _username, _password, _host, _port, _path = \
 
1710
            transport.split_url(url)
 
1711
        try:
 
1712
            _port = int(_port)
 
1713
        except (ValueError, TypeError), e:
 
1714
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
 
1715
        medium = SmartTCPClientMedium(_host, _port)
 
1716
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1717
 
 
1718
 
 
1719
try:
 
1720
    from bzrlib.transport import sftp, ssh
 
1721
except errors.ParamikoNotPresent:
 
1722
    # no paramiko, no SSHTransport.
 
1723
    pass
 
1724
else:
 
1725
    class SmartSSHTransport(SmartTransport):
 
1726
        """Connection to smart server over SSH.
 
1727
 
 
1728
        This is essentially just a factory to get 'RemoteTransport(url,
 
1729
            SmartSSHClientMedium).
 
1730
        """
 
1731
 
 
1732
        def __init__(self, url):
 
1733
            _scheme, _username, _password, _host, _port, _path = \
 
1734
                transport.split_url(url)
 
1735
            try:
 
1736
                if _port is not None:
 
1737
                    _port = int(_port)
 
1738
            except (ValueError, TypeError), e:
 
1739
                raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1740
                    _port)
 
1741
            medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1742
            super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1743
 
 
1744
 
 
1745
def get_test_permutations():
 
1746
    """Return (transport, server) permutations for testing."""
 
1747
    ### We may need a little more test framework support to construct an
 
1748
    ### appropriate RemoteTransport in the future.
 
1749
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]