/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

Merge from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK+ TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the
 
51
idea that you have multiple requests and get have a read error because the
 
52
other side did shutdown sd send.  For pipes we have read pipe which will have a
 
53
zero read which marks end-of-file.  For HTTP server environment there is not
 
54
end-of-stream because each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialisation, deserialisation)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialisation, deserialisation)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
109
"""
 
110
 
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
 
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
 
197
#       the socket, and it assumes everything is UTF8 sections separated
 
198
#       by \001. Which means a request like '\002' Will abort the connection
 
199
#       because of a UnicodeDecodeError. It does look like invalid data will
 
200
#       kill the SmartServerStreamMedium, but only with an abort + exception, and 
 
201
#       the overall server shouldn't die.
 
202
 
 
203
from cStringIO import StringIO
 
204
import os
 
205
import socket
 
206
import tempfile
 
207
import threading
 
208
import urllib
 
209
import urlparse
 
210
 
 
211
from bzrlib import (
 
212
    bzrdir,
 
213
    errors,
 
214
    revision,
 
215
    transport,
 
216
    trace,
 
217
    urlutils,
 
218
    )
 
219
from bzrlib.bundle.serializer import write_bundle
 
220
 
 
221
# must do this otherwise urllib can't parse the urls properly :(
 
222
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
 
223
    transport.register_urlparse_netloc_protocol(scheme)
 
224
del scheme
 
225
 
 
226
 
 
227
def _recv_tuple(from_file):
 
228
    req_line = from_file.readline()
 
229
    return _decode_tuple(req_line)
 
230
 
 
231
 
 
232
def _decode_tuple(req_line):
 
233
    if req_line == None or req_line == '':
 
234
        return None
 
235
    if req_line[-1] != '\n':
 
236
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
237
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
238
 
 
239
 
 
240
def _encode_tuple(args):
 
241
    """Encode the tuple args to a bytestream."""
 
242
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
 
243
 
 
244
 
 
245
class SmartProtocolBase(object):
 
246
    """Methods common to client and server"""
 
247
 
 
248
    # TODO: this only actually accomodates a single block; possibly should support
 
249
    # multiple chunks?
 
250
    def _recv_bulk(self):
 
251
        # This is OBSOLETE except for the double handline in the server: 
 
252
        # the read_bulk + reencode noise.
 
253
        chunk_len = self._in.readline()
 
254
        try:
 
255
            chunk_len = int(chunk_len)
 
256
        except ValueError:
 
257
            raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
 
258
        bulk = self._in.read(chunk_len)
 
259
        if len(bulk) != chunk_len:
 
260
            raise errors.SmartProtocolError("short read fetching bulk data chunk")
 
261
        self._recv_trailer()
 
262
        return bulk
 
263
 
 
264
    def _recv_trailer(self):
 
265
        resp = self._recv_tuple()
 
266
        if resp == ('done', ):
 
267
            return
 
268
        else:
 
269
            self._translate_error(resp)
 
270
 
 
271
    def _encode_bulk_data(self, body):
 
272
        """Encode body as a bulk data chunk."""
 
273
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
274
 
 
275
    def _serialise_offsets(self, offsets):
 
276
        """Serialise a readv offset list."""
 
277
        txt = []
 
278
        for start, length in offsets:
 
279
            txt.append('%d,%d' % (start, length))
 
280
        return '\n'.join(txt)
 
281
 
 
282
    def _send_bulk_data(self, body, a_file=None):
 
283
        """Send chunked body data"""
 
284
        assert isinstance(body, str)
 
285
        bytes = self._encode_bulk_data(body)
 
286
        self._write_and_flush(bytes, a_file)
 
287
 
 
288
    def _write_and_flush(self, bytes, a_file=None):
 
289
        """Write bytes to self._out and flush it."""
 
290
        # XXX: this will be inefficient.  Just ask Robert.
 
291
        if a_file is None:
 
292
            a_file = self._out
 
293
        a_file.write(bytes)
 
294
        a_file.flush()
 
295
        
 
296
 
 
297
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
298
    """Server-side encoding and decoding logic for smart version 1."""
 
299
    
 
300
    def __init__(self, output_stream, backing_transport):
 
301
        self._out_stream = output_stream
 
302
        self._backing_transport = backing_transport
 
303
        self.excess_buffer = ''
 
304
        self.finished_reading = False
 
305
        self.in_buffer = ''
 
306
        self.has_dispatched = False
 
307
        self.request = None
 
308
        self._body_decoder = None
 
309
 
 
310
    def accept_bytes(self, bytes):
 
311
        """Take bytes, and advance the internal state machine appropriately.
 
312
        
 
313
        :param bytes: must be a byte string
 
314
        """
 
315
        assert isinstance(bytes, str)
 
316
        self.in_buffer += bytes
 
317
        if not self.has_dispatched:
 
318
            if '\n' not in self.in_buffer:
 
319
                # no command line yet
 
320
                return
 
321
            self.has_dispatched = True
 
322
            # XXX if in_buffer not \n-terminated this will do the wrong
 
323
            # thing.
 
324
            try:
 
325
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
326
                first_line += '\n'
 
327
                req_args = _decode_tuple(first_line)
 
328
                self.request = SmartServerRequestHandler(
 
329
                    self._backing_transport)
 
330
                self.request.dispatch_command(req_args[0], req_args[1:])
 
331
                if self.request.finished_reading:
 
332
                    # trivial request
 
333
                    self.excess_buffer = self.in_buffer
 
334
                    self.in_buffer = ''
 
335
                    self._send_response(self.request.response.args,
 
336
                        self.request.response.body)
 
337
                self.sync_with_request(self.request)
 
338
                return self.request
 
339
            except KeyboardInterrupt:
 
340
                raise
 
341
            except Exception, exception:
 
342
                # everything else: pass to client, flush, and quit
 
343
                self._send_response(('error', str(exception)))
 
344
                return None
 
345
 
 
346
        else:
 
347
            if self.finished_reading:
 
348
                # nothing to do.XXX: this routine should be a single state 
 
349
                # machine too.
 
350
                self.excess_buffer += self.in_buffer
 
351
                self.in_buffer = ''
 
352
                return
 
353
            if self._body_decoder is None:
 
354
                self._body_decoder = LengthPrefixedBodyDecoder()
 
355
            self._body_decoder.accept_bytes(self.in_buffer)
 
356
            self.in_buffer = self._body_decoder.unused_data
 
357
            body_data = self._body_decoder.read_pending_data()
 
358
            self.request.accept_body(body_data)
 
359
            if self._body_decoder.finished_reading:
 
360
                self.request.end_of_body()
 
361
                assert self.request.finished_reading, \
 
362
                    "no more body, request not finished"
 
363
            self.sync_with_request(self.request)
 
364
            if self.request.response is not None:
 
365
                self._send_response(self.request.response.args,
 
366
                    self.request.response.body)
 
367
                self.excess_buffer = self.in_buffer
 
368
                self.in_buffer = ''
 
369
            else:
 
370
                assert not self.request.finished_reading, \
 
371
                    "no response and we have finished reading."
 
372
 
 
373
    def _send_response(self, args, body=None):
 
374
        """Send a smart server response down the output stream."""
 
375
        self._out_stream.write(_encode_tuple(args))
 
376
        if body is None:
 
377
            self._out_stream.flush()
 
378
        else:
 
379
            self._send_bulk_data(body, self._out_stream)
 
380
            #self._out_stream.write('BLARGH')
 
381
 
 
382
    def sync_with_request(self, request):
 
383
        self.finished_reading = request.finished_reading
 
384
        
 
385
 
 
386
class LengthPrefixedBodyDecoder(object):
 
387
    """Decodes the length-prefixed bulk data."""
 
388
    
 
389
    def __init__(self):
 
390
        self.bytes_left = None
 
391
        self.finished_reading = False
 
392
        self.unused_data = ''
 
393
        self.state_accept = self._state_accept_expecting_length
 
394
        self.state_read = self._state_read_no_data
 
395
        self._in_buffer = ''
 
396
        self._trailer_buffer = ''
 
397
    
 
398
    def accept_bytes(self, bytes):
 
399
        """Decode as much of bytes as possible.
 
400
 
 
401
        If 'bytes' contains too much data it will be appended to
 
402
        self.unused_data.
 
403
 
 
404
        finished_reading will be set when no more data is required.  Further
 
405
        data will be appended to self.unused_data.
 
406
        """
 
407
        # accept_bytes is allowed to change the state
 
408
        current_state = self.state_accept
 
409
        self.state_accept(bytes)
 
410
        while current_state != self.state_accept:
 
411
            current_state = self.state_accept
 
412
            self.state_accept('')
 
413
 
 
414
    def next_read_size(self):
 
415
        if self.bytes_left is not None:
 
416
            # Ideally we want to read all the remainder of the body and the
 
417
            # trailer in one go.
 
418
            return self.bytes_left + 5
 
419
        elif self.state_accept == self._state_accept_reading_trailer:
 
420
            # Just the trailer left
 
421
            return 5 - len(self._trailer_buffer)
 
422
        elif self.state_accept == self._state_accept_expecting_length:
 
423
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
424
            # 'done\n').
 
425
            return 6
 
426
        else:
 
427
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
428
            return 1
 
429
        
 
430
    def read_pending_data(self):
 
431
        """Return any pending data that has been decoded."""
 
432
        return self.state_read()
 
433
 
 
434
    def _state_accept_expecting_length(self, bytes):
 
435
        self._in_buffer += bytes
 
436
        pos = self._in_buffer.find('\n')
 
437
        if pos == -1:
 
438
            return
 
439
        self.bytes_left = int(self._in_buffer[:pos])
 
440
        self._in_buffer = self._in_buffer[pos+1:]
 
441
        self.bytes_left -= len(self._in_buffer)
 
442
        self.state_accept = self._state_accept_reading_body
 
443
        self.state_read = self._state_read_in_buffer
 
444
 
 
445
    def _state_accept_reading_body(self, bytes):
 
446
        self._in_buffer += bytes
 
447
        self.bytes_left -= len(bytes)
 
448
        if self.bytes_left <= 0:
 
449
            # Finished with body
 
450
            if self.bytes_left != 0:
 
451
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
452
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
453
            self.bytes_left = None
 
454
            self.state_accept = self._state_accept_reading_trailer
 
455
        
 
456
    def _state_accept_reading_trailer(self, bytes):
 
457
        self._trailer_buffer += bytes
 
458
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
459
        # a ProtocolViolation exception?
 
460
        if self._trailer_buffer.startswith('done\n'):
 
461
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
462
            self.state_accept = self._state_accept_reading_unused
 
463
            self.finished_reading = True
 
464
    
 
465
    def _state_accept_reading_unused(self, bytes):
 
466
        self.unused_data += bytes
 
467
 
 
468
    def _state_read_no_data(self):
 
469
        return ''
 
470
 
 
471
    def _state_read_in_buffer(self):
 
472
        result = self._in_buffer
 
473
        self._in_buffer = ''
 
474
        return result
 
475
 
 
476
 
 
477
class SmartServerStreamMedium(SmartProtocolBase):
 
478
    """Handles smart commands coming over a stream.
 
479
 
 
480
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
481
    in-process fifo for testing.
 
482
 
 
483
    One instance is created for each connected client; it can serve multiple
 
484
    requests in the lifetime of the connection.
 
485
 
 
486
    The server passes requests through to an underlying backing transport, 
 
487
    which will typically be a LocalTransport looking at the server's filesystem.
 
488
    """
 
489
 
 
490
    def __init__(self, in_file, out_file, backing_transport):
 
491
        """Construct new server.
 
492
 
 
493
        :param in_file: Python file from which requests can be read.
 
494
        :param out_file: Python file to write responses.
 
495
        :param backing_transport: Transport for the directory served.
 
496
        """
 
497
        self._in = in_file
 
498
        self._out = out_file
 
499
        self.backing_transport = backing_transport
 
500
 
 
501
    def _recv_tuple(self):
 
502
        """Read a request from the client and return as a tuple.
 
503
        
 
504
        Returns None at end of file (if the client closed the connection.)
 
505
        """
 
506
        # ** Deserialise and read bytes
 
507
        return _recv_tuple(self._in)
 
508
 
 
509
    def _send_tuple(self, args):
 
510
        """Send response header"""
 
511
        # ** serialise and write bytes
 
512
        return self._write_and_flush(_encode_tuple(args))
 
513
 
 
514
    def _send_error_and_disconnect(self, exception):
 
515
        # ** serialise and write bytes
 
516
        self._send_tuple(('error', str(exception)))
 
517
        ## self._out.close()
 
518
        ## self._in.close()
 
519
 
 
520
    def _serve_one_request(self):
 
521
        """Read one request from input, process, send back a response.
 
522
        
 
523
        :return: False if the server should terminate, otherwise None.
 
524
        """
 
525
        # ** deserialise, read bytes, serialise and write bytes
 
526
        req_line = self._in.readline()
 
527
        # this should just test "req_line == ''", surely?  -- Andrew Bennetts
 
528
        if req_line in ('', None):
 
529
            # client closed connection
 
530
            return False  # shutdown server
 
531
        try:
 
532
            protocol = SmartServerRequestProtocolOne(self._out,
 
533
                self.backing_transport)
 
534
            protocol.accept_bytes(req_line)
 
535
            if not protocol.finished_reading:
 
536
                # this boils down to readline which wont block on open sockets
 
537
                # without data. We should really though read as much as is
 
538
                # available and then hand to that accept_bytes without this
 
539
                # silly double-decode.
 
540
                bulk = self._recv_bulk()
 
541
                bulk_bytes = ''.join(('%d\n' % len(bulk), bulk, 'done\n'))
 
542
                protocol.accept_bytes(bulk_bytes)
 
543
                # might be nice to do protocol.end_of_bytes()
 
544
                # because self._recv_bulk reads all the bytes, this must finish
 
545
                # after one delivery of data rather than looping.
 
546
                assert protocol.finished_reading
 
547
        except KeyboardInterrupt:
 
548
            raise
 
549
        except Exception, e:
 
550
            # everything else: pass to client, flush, and quit
 
551
            self._send_error_and_disconnect(e)
 
552
            return False
 
553
 
 
554
    def serve(self):
 
555
        """Serve requests until the client disconnects."""
 
556
        # Keep a reference to stderr because the sys module's globals get set to
 
557
        # None during interpreter shutdown.
 
558
        from sys import stderr
 
559
        try:
 
560
            while self._serve_one_request() != False:
 
561
                pass
 
562
        except Exception, e:
 
563
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
564
            raise
 
565
 
 
566
 
 
567
class SmartServerResponse(object):
 
568
    """Response generated by SmartServerRequestHandler."""
 
569
 
 
570
    def __init__(self, args, body=None):
 
571
        self.args = args
 
572
        self.body = body
 
573
 
 
574
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
575
# for delivering the data for a request. This could be done with as the
 
576
# StreamServer, though that would create conflation between request and response
 
577
# which may be undesirable.
 
578
 
 
579
 
 
580
class SmartServerRequestHandler(object):
 
581
    """Protocol logic for smart server.
 
582
    
 
583
    This doesn't handle serialization at all, it just processes requests and
 
584
    creates responses.
 
585
    """
 
586
 
 
587
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
588
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
589
    # from the object protocol: we will want to tweak the wire protocol separate
 
590
    # from the object model, and ideally we will be able to do that without
 
591
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
592
    # just a Protocol subclass.
 
593
 
 
594
    # TODO: Better way of representing the body for commands that take it,
 
595
    # and allow it to be streamed into the server.
 
596
    
 
597
    def __init__(self, backing_transport):
 
598
        self._backing_transport = backing_transport
 
599
        self._converted_command = False
 
600
        self.finished_reading = False
 
601
        self._body_bytes = ''
 
602
        self.response = None
 
603
 
 
604
    def accept_body(self, bytes):
 
605
        """Accept body data.
 
606
 
 
607
        This should be overriden for each command that desired body data to
 
608
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
609
 
 
610
        The deserialisation into that format should be done in the Protocol
 
611
        object. Set self.desired_body_format to the format your method will
 
612
        handle.
 
613
        """
 
614
        # default fallback is to accumulate bytes.
 
615
        self._body_bytes += bytes
 
616
        
 
617
    def _end_of_body_handler(self):
 
618
        """An unimplemented end of body handler."""
 
619
        raise NotImplementedError(self._end_of_body_handler)
 
620
        
 
621
    def do_hello(self):
 
622
        """Answer a version request with my version."""
 
623
        return SmartServerResponse(('ok', '1'))
 
624
 
 
625
    def do_has(self, relpath):
 
626
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
627
        return SmartServerResponse((r,))
 
628
 
 
629
    def do_get(self, relpath):
 
630
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
631
        return SmartServerResponse(('ok',), backing_bytes)
 
632
 
 
633
    def _deserialise_optional_mode(self, mode):
 
634
        # XXX: FIXME this should be on the protocol object.
 
635
        if mode == '':
 
636
            return None
 
637
        else:
 
638
            return int(mode)
 
639
 
 
640
    def do_append(self, relpath, mode):
 
641
        self._converted_command = True
 
642
        self._relpath = relpath
 
643
        self._mode = self._deserialise_optional_mode(mode)
 
644
        self._end_of_body_handler = self._handle_do_append_end
 
645
    
 
646
    def _handle_do_append_end(self):
 
647
        old_length = self._backing_transport.append_bytes(
 
648
            self._relpath, self._body_bytes, self._mode)
 
649
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
650
 
 
651
    def do_delete(self, relpath):
 
652
        self._backing_transport.delete(relpath)
 
653
 
 
654
    def do_iter_files_recursive(self, abspath):
 
655
        # XXX: the path handling needs some thought.
 
656
        #relpath = self._backing_transport.relpath(abspath)
 
657
        transport = self._backing_transport.clone(abspath)
 
658
        filenames = transport.iter_files_recursive()
 
659
        return SmartServerResponse(('names',) + tuple(filenames))
 
660
 
 
661
    def do_list_dir(self, relpath):
 
662
        filenames = self._backing_transport.list_dir(relpath)
 
663
        return SmartServerResponse(('names',) + tuple(filenames))
 
664
 
 
665
    def do_mkdir(self, relpath, mode):
 
666
        self._backing_transport.mkdir(relpath,
 
667
                                      self._deserialise_optional_mode(mode))
 
668
 
 
669
    def do_move(self, rel_from, rel_to):
 
670
        self._backing_transport.move(rel_from, rel_to)
 
671
 
 
672
    def do_put(self, relpath, mode):
 
673
        self._converted_command = True
 
674
        self._relpath = relpath
 
675
        self._mode = self._deserialise_optional_mode(mode)
 
676
        self._end_of_body_handler = self._handle_do_put
 
677
 
 
678
    def _handle_do_put(self):
 
679
        self._backing_transport.put_bytes(self._relpath,
 
680
                self._body_bytes, self._mode)
 
681
        self.response = SmartServerResponse(('ok',))
 
682
 
 
683
    def _deserialise_offsets(self, text):
 
684
        # XXX: FIXME this should be on the protocol object.
 
685
        offsets = []
 
686
        for line in text.split('\n'):
 
687
            if not line:
 
688
                continue
 
689
            start, length = line.split(',')
 
690
            offsets.append((int(start), int(length)))
 
691
        return offsets
 
692
 
 
693
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
694
        self._converted_command = True
 
695
        self._end_of_body_handler = self._handle_put_non_atomic
 
696
        self._relpath = relpath
 
697
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
698
        self._mode = self._deserialise_optional_mode(mode)
 
699
        # a boolean would be nicer XXX
 
700
        self._create_parent = (create_parent == 'T')
 
701
 
 
702
    def _handle_put_non_atomic(self):
 
703
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
704
                self._body_bytes,
 
705
                mode=self._mode,
 
706
                create_parent_dir=self._create_parent,
 
707
                dir_mode=self._dir_mode)
 
708
        self.response = SmartServerResponse(('ok',))
 
709
 
 
710
    def do_readv(self, relpath):
 
711
        self._converted_command = True
 
712
        self._end_of_body_handler = self._handle_readv_offsets
 
713
        self._relpath = relpath
 
714
 
 
715
    def end_of_body(self):
 
716
        """No more body data will be received."""
 
717
        self._run_handler_code(self._end_of_body_handler, (), {})
 
718
        # cannot read after this.
 
719
        self.finished_reading = True
 
720
 
 
721
    def _handle_readv_offsets(self):
 
722
        """accept offsets for a readv request."""
 
723
        offsets = self._deserialise_offsets(self._body_bytes)
 
724
        backing_bytes = ''.join(bytes for offset, bytes in
 
725
            self._backing_transport.readv(self._relpath, offsets))
 
726
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
727
        
 
728
    def do_rename(self, rel_from, rel_to):
 
729
        self._backing_transport.rename(rel_from, rel_to)
 
730
 
 
731
    def do_rmdir(self, relpath):
 
732
        self._backing_transport.rmdir(relpath)
 
733
 
 
734
    def do_stat(self, relpath):
 
735
        stat = self._backing_transport.stat(relpath)
 
736
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
737
        
 
738
    def do_get_bundle(self, path, revision_id):
 
739
        # open transport relative to our base
 
740
        t = self._backing_transport.clone(path)
 
741
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
742
        repo = control.open_repository()
 
743
        tmpf = tempfile.TemporaryFile()
 
744
        base_revision = revision.NULL_REVISION
 
745
        write_bundle(repo, revision_id, base_revision, tmpf)
 
746
        tmpf.seek(0)
 
747
        return SmartServerResponse((), tmpf.read())
 
748
 
 
749
    def dispatch_command(self, cmd, args):
 
750
        """Deprecated compatibility method.""" # XXX XXX
 
751
        func = getattr(self, 'do_' + cmd, None)
 
752
        if func is None:
 
753
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
754
        self._run_handler_code(func, args, {})
 
755
 
 
756
    def _run_handler_code(self, callable, args, kwargs):
 
757
        """Run some handler specific code 'callable'.
 
758
 
 
759
        If a result is returned, it is considered to be the commands response,
 
760
        and finished_reading is set true, and its assigned to self.response.
 
761
 
 
762
        Any exceptions caught are translated and a response object created
 
763
        from them.
 
764
        """
 
765
        result = self._call_converting_errors(callable, args, kwargs)
 
766
        if result is not None:
 
767
            self.response = result
 
768
            self.finished_reading = True
 
769
        # handle unconverted commands
 
770
        if not self._converted_command:
 
771
            self.finished_reading = True
 
772
            if result is None:
 
773
                self.response = SmartServerResponse(('ok',))
 
774
 
 
775
    def _call_converting_errors(self, callable, args, kwargs):
 
776
        """Call callable converting errors to Response objects."""
 
777
        try:
 
778
            return callable(*args, **kwargs)
 
779
        except errors.NoSuchFile, e:
 
780
            return SmartServerResponse(('NoSuchFile', e.path))
 
781
        except errors.FileExists, e:
 
782
            return SmartServerResponse(('FileExists', e.path))
 
783
        except errors.DirectoryNotEmpty, e:
 
784
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
785
        except errors.ShortReadvError, e:
 
786
            return SmartServerResponse(('ShortReadvError',
 
787
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
788
        except UnicodeError, e:
 
789
            # If it is a DecodeError, than most likely we are starting
 
790
            # with a plain string
 
791
            str_or_unicode = e.object
 
792
            if isinstance(str_or_unicode, unicode):
 
793
                val = u'u:' + str_or_unicode
 
794
            else:
 
795
                val = u's:' + str_or_unicode.encode('base64')
 
796
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
797
            return SmartServerResponse((e.__class__.__name__,
 
798
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
799
        except errors.TransportNotPossible, e:
 
800
            if e.msg == "readonly transport":
 
801
                return SmartServerResponse(('ReadOnlyError', ))
 
802
            else:
 
803
                raise
 
804
 
 
805
 
 
806
class SmartTCPServer(object):
 
807
    """Listens on a TCP socket and accepts connections from smart clients"""
 
808
 
 
809
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
810
        """Construct a new server.
 
811
 
 
812
        To actually start it running, call either start_background_thread or
 
813
        serve.
 
814
 
 
815
        :param host: Name of the interface to listen on.
 
816
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
817
        """
 
818
        self._server_socket = socket.socket()
 
819
        self._server_socket.bind((host, port))
 
820
        self.port = self._server_socket.getsockname()[1]
 
821
        self._server_socket.listen(1)
 
822
        self._server_socket.settimeout(1)
 
823
        self.backing_transport = backing_transport
 
824
 
 
825
    def serve(self):
 
826
        # let connections timeout so that we get a chance to terminate
 
827
        # Keep a reference to the exceptions we want to catch because the socket
 
828
        # module's globals get set to None during interpreter shutdown.
 
829
        from socket import timeout as socket_timeout
 
830
        from socket import error as socket_error
 
831
        self._should_terminate = False
 
832
        while not self._should_terminate:
 
833
            try:
 
834
                self.accept_and_serve()
 
835
            except socket_timeout:
 
836
                # just check if we're asked to stop
 
837
                pass
 
838
            except socket_error, e:
 
839
                trace.warning("client disconnected: %s", e)
 
840
                pass
 
841
 
 
842
    def get_url(self):
 
843
        """Return the url of the server"""
 
844
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
845
 
 
846
    def accept_and_serve(self):
 
847
        conn, client_addr = self._server_socket.accept()
 
848
        # For WIN32, where the timeout value from the listening socket
 
849
        # propogates to the newly accepted socket.
 
850
        conn.setblocking(True)
 
851
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
852
        from_client = conn.makefile('r')
 
853
        to_client = conn.makefile('w')
 
854
        handler = SmartServerStreamMedium(from_client, to_client,
 
855
                self.backing_transport)
 
856
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
857
        connection_thread.setDaemon(True)
 
858
        connection_thread.start()
 
859
 
 
860
    def start_background_thread(self):
 
861
        self._server_thread = threading.Thread(None,
 
862
                self.serve,
 
863
                name='server-' + self.get_url())
 
864
        self._server_thread.setDaemon(True)
 
865
        self._server_thread.start()
 
866
 
 
867
    def stop_background_thread(self):
 
868
        self._should_terminate = True
 
869
        # self._server_socket.close()
 
870
        # we used to join the thread, but it's not really necessary; it will
 
871
        # terminate in time
 
872
        ## self._server_thread.join()
 
873
 
 
874
 
 
875
class SmartTCPServer_for_testing(SmartTCPServer):
 
876
    """Server suitable for use by transport tests.
 
877
    
 
878
    This server is backed by the process's cwd.
 
879
    """
 
880
 
 
881
    def __init__(self):
 
882
        self._homedir = os.getcwd()
 
883
        # The server is set up by default like for ssh access: the client
 
884
        # passes filesystem-absolute paths; therefore the server must look
 
885
        # them up relative to the root directory.  it might be better to act
 
886
        # a public server and have the server rewrite paths into the test
 
887
        # directory.
 
888
        SmartTCPServer.__init__(self, transport.get_transport("file:///"))
 
889
        
 
890
    def setUp(self):
 
891
        """Set up server for testing"""
 
892
        self.start_background_thread()
 
893
 
 
894
    def tearDown(self):
 
895
        self.stop_background_thread()
 
896
 
 
897
    def get_url(self):
 
898
        """Return the url of the server"""
 
899
        host, port = self._server_socket.getsockname()
 
900
        # XXX: I think this is likely to break on windows -- self._homedir will
 
901
        # have backslashes (and maybe a drive letter?).
 
902
        #  -- Andrew Bennetts, 2006-08-29
 
903
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
904
 
 
905
    def get_bogus_url(self):
 
906
        """Return a URL which will fail to connect"""
 
907
        return 'bzr://127.0.0.1:1/'
 
908
 
 
909
 
 
910
class SmartStat(object):
 
911
 
 
912
    def __init__(self, size, mode):
 
913
        self.st_size = size
 
914
        self.st_mode = mode
 
915
 
 
916
 
 
917
class SmartTransport(transport.Transport):
 
918
    """Connection to a smart server.
 
919
 
 
920
    The connection holds references to pipes that can be used to send requests
 
921
    to the server.
 
922
 
 
923
    The connection has a notion of the current directory to which it's
 
924
    connected; this is incorporated in filenames passed to the server.
 
925
    
 
926
    This supports some higher-level RPC operations and can also be treated 
 
927
    like a Transport to do file-like operations.
 
928
 
 
929
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
930
    or a series of http requests.  There are concrete subclasses for each
 
931
    type: SmartTCPTransport, etc.
 
932
    """
 
933
 
 
934
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
935
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
936
    # the ability to support multiple versions of the smart protocol over time:
 
937
    # SmartTransport is an adapter from the Transport object model to the 
 
938
    # SmartClient model, not an encoder.
 
939
 
 
940
    def __init__(self, url, clone_from=None, medium=None):
 
941
        """Constructor.
 
942
 
 
943
        :param medium: The medium to use for this RemoteTransport. This must be
 
944
            supplied if clone_from is None.
 
945
        """
 
946
        ### Technically super() here is faulty because Transport's __init__
 
947
        ### fails to take 2 parameters, and if super were to choose a silly
 
948
        ### initialisation order things would blow up. 
 
949
        if not url.endswith('/'):
 
950
            url += '/'
 
951
        super(SmartTransport, self).__init__(url)
 
952
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
953
                transport.split_url(url)
 
954
        if clone_from is None:
 
955
            self._medium = medium
 
956
        else:
 
957
            # credentials may be stripped from the base in some circumstances
 
958
            # as yet to be clearly defined or documented, so copy them.
 
959
            self._username = clone_from._username
 
960
            # reuse same connection
 
961
            self._medium = clone_from._medium
 
962
        assert self._medium is not None
 
963
 
 
964
    def abspath(self, relpath):
 
965
        """Return the full url to the given relative path.
 
966
        
 
967
        @param relpath: the relative path or path components
 
968
        @type relpath: str or list
 
969
        """
 
970
        return self._unparse_url(self._remote_path(relpath))
 
971
    
 
972
    def clone(self, relative_url):
 
973
        """Make a new SmartTransport related to me, sharing the same connection.
 
974
 
 
975
        This essentially opens a handle on a different remote directory.
 
976
        """
 
977
        if relative_url is None:
 
978
            return SmartTransport(self.base, self)
 
979
        else:
 
980
            return SmartTransport(self.abspath(relative_url), self)
 
981
 
 
982
    def is_readonly(self):
 
983
        """Smart server transport can do read/write file operations."""
 
984
        return False
 
985
                                                   
 
986
    def get_smart_client(self):
 
987
        return self._medium
 
988
 
 
989
    def get_smart_medium(self):
 
990
        return self._medium
 
991
                                                   
 
992
    def _unparse_url(self, path):
 
993
        """Return URL for a path.
 
994
 
 
995
        :see: SFTPUrlHandling._unparse_url
 
996
        """
 
997
        # TODO: Eventually it should be possible to unify this with
 
998
        # SFTPUrlHandling._unparse_url?
 
999
        if path == '':
 
1000
            path = '/'
 
1001
        path = urllib.quote(path)
 
1002
        netloc = urllib.quote(self._host)
 
1003
        if self._username is not None:
 
1004
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
1005
        if self._port is not None:
 
1006
            netloc = '%s:%d' % (netloc, self._port)
 
1007
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
1008
 
 
1009
    def _remote_path(self, relpath):
 
1010
        """Returns the Unicode version of the absolute path for relpath."""
 
1011
        return self._combine_paths(self._path, relpath)
 
1012
 
 
1013
    def _call(self, method, *args):
 
1014
        resp = self._call2(method, *args)
 
1015
        self._translate_error(resp)
 
1016
 
 
1017
    def _call2(self, method, *args):
 
1018
        """Call a method on the remote server."""
 
1019
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1020
        protocol.call(method, *args)
 
1021
        return protocol.read_response_tuple()
 
1022
 
 
1023
    def _call_with_body_bytes(self, method, args, body):
 
1024
        """Call a method on the remote server with body bytes."""
 
1025
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1026
        protocol.call_with_body_bytes((method, ) + args, body)
 
1027
        return protocol.read_response_tuple()
 
1028
 
 
1029
    def has(self, relpath):
 
1030
        """Indicate whether a remote file of the given name exists or not.
 
1031
 
 
1032
        :see: Transport.has()
 
1033
        """
 
1034
        resp = self._call2('has', self._remote_path(relpath))
 
1035
        if resp == ('yes', ):
 
1036
            return True
 
1037
        elif resp == ('no', ):
 
1038
            return False
 
1039
        else:
 
1040
            self._translate_error(resp)
 
1041
 
 
1042
    def get(self, relpath):
 
1043
        """Return file-like object reading the contents of a remote file.
 
1044
        
 
1045
        :see: Transport.get_bytes()/get_file()
 
1046
        """
 
1047
        return StringIO(self.get_bytes(relpath))
 
1048
 
 
1049
    def get_bytes(self, relpath):
 
1050
        remote = self._remote_path(relpath)
 
1051
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1052
        protocol.call('get', remote)
 
1053
        resp = protocol.read_response_tuple(True)
 
1054
        if resp != ('ok', ):
 
1055
            protocol.cancel_read_body()
 
1056
            self._translate_error(resp, relpath)
 
1057
        return protocol.read_body_bytes()
 
1058
 
 
1059
    def _serialise_optional_mode(self, mode):
 
1060
        if mode is None:
 
1061
            return ''
 
1062
        else:
 
1063
            return '%d' % mode
 
1064
 
 
1065
    def mkdir(self, relpath, mode=None):
 
1066
        resp = self._call2('mkdir', self._remote_path(relpath),
 
1067
            self._serialise_optional_mode(mode))
 
1068
        self._translate_error(resp)
 
1069
 
 
1070
    def put_bytes(self, relpath, upload_contents, mode=None):
 
1071
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
1072
        # should probably just pass all parameters as length-delimited
 
1073
        # strings?
 
1074
        resp = self._call_with_body_bytes('put',
 
1075
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1076
            upload_contents)
 
1077
        self._translate_error(resp)
 
1078
 
 
1079
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
1080
                             create_parent_dir=False,
 
1081
                             dir_mode=None):
 
1082
        """See Transport.put_bytes_non_atomic."""
 
1083
        # FIXME: no encoding in the transport!
 
1084
        create_parent_str = 'F'
 
1085
        if create_parent_dir:
 
1086
            create_parent_str = 'T'
 
1087
 
 
1088
        resp = self._call_with_body_bytes(
 
1089
            'put_non_atomic',
 
1090
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
1091
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
1092
            bytes)
 
1093
        self._translate_error(resp)
 
1094
 
 
1095
    def put_file(self, relpath, upload_file, mode=None):
 
1096
        # its not ideal to seek back, but currently put_non_atomic_file depends
 
1097
        # on transports not reading before failing - which is a faulty
 
1098
        # assumption I think - RBC 20060915
 
1099
        pos = upload_file.tell()
 
1100
        try:
 
1101
            return self.put_bytes(relpath, upload_file.read(), mode)
 
1102
        except:
 
1103
            upload_file.seek(pos)
 
1104
            raise
 
1105
 
 
1106
    def put_file_non_atomic(self, relpath, f, mode=None,
 
1107
                            create_parent_dir=False,
 
1108
                            dir_mode=None):
 
1109
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
1110
                                         create_parent_dir=create_parent_dir,
 
1111
                                         dir_mode=dir_mode)
 
1112
 
 
1113
    def append_file(self, relpath, from_file, mode=None):
 
1114
        return self.append_bytes(relpath, from_file.read(), mode)
 
1115
        
 
1116
    def append_bytes(self, relpath, bytes, mode=None):
 
1117
        resp = self._call_with_body_bytes(
 
1118
            'append',
 
1119
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1120
            bytes)
 
1121
        if resp[0] == 'appended':
 
1122
            return int(resp[1])
 
1123
        self._translate_error(resp)
 
1124
 
 
1125
    def delete(self, relpath):
 
1126
        resp = self._call2('delete', self._remote_path(relpath))
 
1127
        self._translate_error(resp)
 
1128
 
 
1129
    def readv(self, relpath, offsets):
 
1130
        if not offsets:
 
1131
            return
 
1132
 
 
1133
        offsets = list(offsets)
 
1134
 
 
1135
        sorted_offsets = sorted(offsets)
 
1136
        # turn the list of offsets into a stack
 
1137
        offset_stack = iter(offsets)
 
1138
        cur_offset_and_size = offset_stack.next()
 
1139
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
1140
                               limit=self._max_readv_combine,
 
1141
                               fudge_factor=self._bytes_to_read_before_seek))
 
1142
 
 
1143
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1144
        protocol.call_with_body_readv_array(
 
1145
            ('readv', self._remote_path(relpath)),
 
1146
            [(c.start, c.length) for c in coalesced])
 
1147
        resp = protocol.read_response_tuple(True)
 
1148
 
 
1149
        if resp[0] != 'readv':
 
1150
            # This should raise an exception
 
1151
            protocol.cancel_read_body()
 
1152
            self._translate_error(resp)
 
1153
            return
 
1154
 
 
1155
        # FIXME: this should know how many bytes are needed, for clarity.
 
1156
        data = protocol.read_body_bytes()
 
1157
        # Cache the results, but only until they have been fulfilled
 
1158
        data_map = {}
 
1159
        for c_offset in coalesced:
 
1160
            if len(data) < c_offset.length:
 
1161
                raise errors.ShortReadvError(relpath, c_offset.start,
 
1162
                            c_offset.length, actual=len(data))
 
1163
            for suboffset, subsize in c_offset.ranges:
 
1164
                key = (c_offset.start+suboffset, subsize)
 
1165
                data_map[key] = data[suboffset:suboffset+subsize]
 
1166
            data = data[c_offset.length:]
 
1167
 
 
1168
            # Now that we've read some data, see if we can yield anything back
 
1169
            while cur_offset_and_size in data_map:
 
1170
                this_data = data_map.pop(cur_offset_and_size)
 
1171
                yield cur_offset_and_size[0], this_data
 
1172
                cur_offset_and_size = offset_stack.next()
 
1173
 
 
1174
    def rename(self, rel_from, rel_to):
 
1175
        self._call('rename',
 
1176
                   self._remote_path(rel_from),
 
1177
                   self._remote_path(rel_to))
 
1178
 
 
1179
    def move(self, rel_from, rel_to):
 
1180
        self._call('move',
 
1181
                   self._remote_path(rel_from),
 
1182
                   self._remote_path(rel_to))
 
1183
 
 
1184
    def rmdir(self, relpath):
 
1185
        resp = self._call('rmdir', self._remote_path(relpath))
 
1186
 
 
1187
    def _translate_error(self, resp, orig_path=None):
 
1188
        """Raise an exception from a response"""
 
1189
        if resp is None:
 
1190
            what = None
 
1191
        else:
 
1192
            what = resp[0]
 
1193
        if what == 'ok':
 
1194
            return
 
1195
        elif what == 'NoSuchFile':
 
1196
            if orig_path is not None:
 
1197
                error_path = orig_path
 
1198
            else:
 
1199
                error_path = resp[1]
 
1200
            raise errors.NoSuchFile(error_path)
 
1201
        elif what == 'error':
 
1202
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1203
        elif what == 'FileExists':
 
1204
            raise errors.FileExists(resp[1])
 
1205
        elif what == 'DirectoryNotEmpty':
 
1206
            raise errors.DirectoryNotEmpty(resp[1])
 
1207
        elif what == 'ShortReadvError':
 
1208
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1209
                                         int(resp[3]), int(resp[4]))
 
1210
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1211
            encoding = str(resp[1]) # encoding must always be a string
 
1212
            val = resp[2]
 
1213
            start = int(resp[3])
 
1214
            end = int(resp[4])
 
1215
            reason = str(resp[5]) # reason must always be a string
 
1216
            if val.startswith('u:'):
 
1217
                val = val[2:]
 
1218
            elif val.startswith('s:'):
 
1219
                val = val[2:].decode('base64')
 
1220
            if what == 'UnicodeDecodeError':
 
1221
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1222
            elif what == 'UnicodeEncodeError':
 
1223
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1224
        elif what == "ReadOnlyError":
 
1225
            raise errors.TransportNotPossible('readonly transport')
 
1226
        else:
 
1227
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
1228
 
 
1229
    def disconnect(self):
 
1230
        self._medium.disconnect()
 
1231
 
 
1232
    def delete_tree(self, relpath):
 
1233
        raise errors.TransportNotPossible('readonly transport')
 
1234
 
 
1235
    def stat(self, relpath):
 
1236
        resp = self._call2('stat', self._remote_path(relpath))
 
1237
        if resp[0] == 'stat':
 
1238
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
1239
        else:
 
1240
            self._translate_error(resp)
 
1241
 
 
1242
    ## def lock_read(self, relpath):
 
1243
    ##     """Lock the given file for shared (read) access.
 
1244
    ##     :return: A lock object, which should be passed to Transport.unlock()
 
1245
    ##     """
 
1246
    ##     # The old RemoteBranch ignore lock for reading, so we will
 
1247
    ##     # continue that tradition and return a bogus lock object.
 
1248
    ##     class BogusLock(object):
 
1249
    ##         def __init__(self, path):
 
1250
    ##             self.path = path
 
1251
    ##         def unlock(self):
 
1252
    ##             pass
 
1253
    ##     return BogusLock(relpath)
 
1254
 
 
1255
    def listable(self):
 
1256
        return True
 
1257
 
 
1258
    def list_dir(self, relpath):
 
1259
        resp = self._call2('list_dir', self._remote_path(relpath))
 
1260
        if resp[0] == 'names':
 
1261
            return [name.encode('ascii') for name in resp[1:]]
 
1262
        else:
 
1263
            self._translate_error(resp)
 
1264
 
 
1265
    def iter_files_recursive(self):
 
1266
        resp = self._call2('iter_files_recursive', self._remote_path(''))
 
1267
        if resp[0] == 'names':
 
1268
            return resp[1:]
 
1269
        else:
 
1270
            self._translate_error(resp)
 
1271
 
 
1272
 
 
1273
class SmartClientMediumRequest(object):
 
1274
    """A request on a SmartClientMedium.
 
1275
 
 
1276
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1277
    the response bytes to be read via read_bytes.
 
1278
 
 
1279
    For instance:
 
1280
    request.accept_bytes('123')
 
1281
    request.finished_writing()
 
1282
    result = request.read_bytes(3)
 
1283
    request.finished_reading()
 
1284
 
 
1285
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1286
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1287
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1288
    details on concurrency and pipelining.
 
1289
    """
 
1290
 
 
1291
    def __init__(self, medium):
 
1292
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1293
        self._medium = medium
 
1294
        # we track state by constants - we may want to use the same
 
1295
        # pattern as BodyReader if it gets more complex.
 
1296
        # valid states are: "writing", "reading", "done"
 
1297
        self._state = "writing"
 
1298
 
 
1299
    def accept_bytes(self, bytes):
 
1300
        """Accept bytes for inclusion in this request.
 
1301
 
 
1302
        This method may not be be called after finished_writing() has been
 
1303
        called.  It depends upon the Medium whether or not the bytes will be
 
1304
        immediately transmitted. Message based Mediums will tend to buffer the
 
1305
        bytes until finished_writing() is called.
 
1306
 
 
1307
        :param bytes: A bytestring.
 
1308
        """
 
1309
        if self._state != "writing":
 
1310
            raise errors.WritingCompleted(self)
 
1311
        self._accept_bytes(bytes)
 
1312
 
 
1313
    def _accept_bytes(self, bytes):
 
1314
        """Helper for accept_bytes.
 
1315
 
 
1316
        Accept_bytes checks the state of the request to determing if bytes
 
1317
        should be accepted. After that it hands off to _accept_bytes to do the
 
1318
        actual acceptance.
 
1319
        """
 
1320
        raise NotImplementedError(self._accept_bytes)
 
1321
 
 
1322
    def finished_reading(self):
 
1323
        """Inform the request that all desired data has been read.
 
1324
 
 
1325
        This will remove the request from the pipeline for its medium (if the
 
1326
        medium supports pipelining) and any further calls to methods on the
 
1327
        request will raise ReadingCompleted.
 
1328
        """
 
1329
        if self._state == "writing":
 
1330
            raise errors.WritingNotComplete(self)
 
1331
        if self._state != "reading":
 
1332
            raise errors.ReadingCompleted(self)
 
1333
        self._state = "done"
 
1334
        self._finished_reading()
 
1335
 
 
1336
    def _finished_reading(self):
 
1337
        """Helper for finished_reading.
 
1338
 
 
1339
        finished_reading checks the state of the request to determine if 
 
1340
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1341
        to perform the action.
 
1342
        """
 
1343
        raise NotImplementedError(self._finished_reading)
 
1344
 
 
1345
    def finished_writing(self):
 
1346
        """Finish the writing phase of this request.
 
1347
 
 
1348
        This will flush all pending data for this request along the medium.
 
1349
        After calling finished_writing, you may not call accept_bytes anymore.
 
1350
        """
 
1351
        if self._state != "writing":
 
1352
            raise errors.WritingCompleted(self)
 
1353
        self._state = "reading"
 
1354
        self._finished_writing()
 
1355
 
 
1356
    def _finished_writing(self):
 
1357
        """Helper for finished_writing.
 
1358
 
 
1359
        finished_writing checks the state of the request to determine if 
 
1360
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1361
        to perform the action.
 
1362
        """
 
1363
        raise NotImplementedError(self._finished_writing)
 
1364
 
 
1365
    def read_bytes(self, count):
 
1366
        """Read bytes from this requests response.
 
1367
 
 
1368
        This method will block and wait for count bytes to be read. It may not
 
1369
        be invoked until finished_writing() has been called - this is to ensure
 
1370
        a message-based approach to requests, for compatability with message
 
1371
        based mediums like HTTP.
 
1372
        """
 
1373
        if self._state == "writing":
 
1374
            raise errors.WritingNotComplete(self)
 
1375
        if self._state != "reading":
 
1376
            raise errors.ReadingCompleted(self)
 
1377
        return self._read_bytes(count)
 
1378
 
 
1379
    def _read_bytes(self, count):
 
1380
        """Helper for read_bytes.
 
1381
 
 
1382
        read_bytes checks the state of the request to determing if bytes
 
1383
        should be read. After that it hands off to _read_bytes to do the
 
1384
        actual read.
 
1385
        """
 
1386
        raise NotImplementedError(self._read_bytes)
 
1387
 
 
1388
 
 
1389
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1390
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1391
 
 
1392
    def __init__(self, medium):
 
1393
        SmartClientMediumRequest.__init__(self, medium)
 
1394
        # check that we are safe concurrency wise. If some streams start
 
1395
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1396
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1397
        # and the setting/unsetting of _current_request likewise moved into
 
1398
        # that class : but its unneeded overhead for now. RBC 20060922
 
1399
        if self._medium._current_request is not None:
 
1400
            raise errors.TooManyConcurrentRequests(self._medium)
 
1401
        self._medium._current_request = self
 
1402
 
 
1403
    def _accept_bytes(self, bytes):
 
1404
        """See SmartClientMediumRequest._accept_bytes.
 
1405
        
 
1406
        This forwards to self._medium._accept_bytes because we are operating
 
1407
        on the mediums stream.
 
1408
        """
 
1409
        self._medium._accept_bytes(bytes)
 
1410
 
 
1411
    def _finished_reading(self):
 
1412
        """See SmartClientMediumRequest._finished_reading.
 
1413
 
 
1414
        This clears the _current_request on self._medium to allow a new 
 
1415
        request to be created.
 
1416
        """
 
1417
        assert self._medium._current_request is self
 
1418
        self._medium._current_request = None
 
1419
        
 
1420
    def _finished_writing(self):
 
1421
        """See SmartClientMediumRequest._finished_writing.
 
1422
 
 
1423
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1424
        """
 
1425
        self._medium._flush()
 
1426
 
 
1427
    def _read_bytes(self, count):
 
1428
        """See SmartClientMediumRequest._read_bytes.
 
1429
        
 
1430
        This forwards to self._medium._read_bytes because we are operating
 
1431
        on the mediums stream.
 
1432
        """
 
1433
        return self._medium._read_bytes(count)
 
1434
 
 
1435
 
 
1436
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1437
    """The client-side protocol for smart version 1."""
 
1438
 
 
1439
    def __init__(self, request):
 
1440
        """Construct a SmartClientRequestProtocolOne.
 
1441
 
 
1442
        :param request: A SmartClientMediumRequest to serialise onto and
 
1443
            deserialise from.
 
1444
        """
 
1445
        self._request = request
 
1446
        self._body_buffer = None
 
1447
 
 
1448
    def call(self, *args):
 
1449
        bytes = _encode_tuple(args)
 
1450
        self._request.accept_bytes(bytes)
 
1451
        self._request.finished_writing()
 
1452
 
 
1453
    def call_with_body_bytes(self, args, body):
 
1454
        """Make a remote call of args with body bytes 'body'.
 
1455
 
 
1456
        After calling this, call read_response_tuple to find the result out.
 
1457
        """
 
1458
        bytes = _encode_tuple(args)
 
1459
        self._request.accept_bytes(bytes)
 
1460
        bytes = self._encode_bulk_data(body)
 
1461
        self._request.accept_bytes(bytes)
 
1462
        self._request.finished_writing()
 
1463
 
 
1464
    def call_with_body_readv_array(self, args, body):
 
1465
        """Make a remote call with a readv array.
 
1466
 
 
1467
        The body is encoded with one line per readv offset pair. The numbers in
 
1468
        each pair are separated by a comma, and no trailing \n is emitted.
 
1469
        """
 
1470
        bytes = _encode_tuple(args)
 
1471
        self._request.accept_bytes(bytes)
 
1472
        readv_bytes = self._serialise_offsets(body)
 
1473
        bytes = self._encode_bulk_data(readv_bytes)
 
1474
        self._request.accept_bytes(bytes)
 
1475
        self._request.finished_writing()
 
1476
 
 
1477
    def cancel_read_body(self):
 
1478
        """After expecting a body, a response code may indicate one otherwise.
 
1479
 
 
1480
        This method lets the domain client inform the protocol that no body
 
1481
        will be transmitted. This is a terminal method: after calling it the
 
1482
        protocol is not able to be used further.
 
1483
        """
 
1484
        self._request.finished_reading()
 
1485
 
 
1486
    def read_response_tuple(self, expect_body=False):
 
1487
        """Read a response tuple from the wire.
 
1488
 
 
1489
        This should only be called once.
 
1490
        """
 
1491
        result = self._recv_tuple()
 
1492
        if not expect_body:
 
1493
            self._request.finished_reading()
 
1494
        return result
 
1495
 
 
1496
    def read_body_bytes(self, count=-1):
 
1497
        """Read bytes from the body, decoding into a byte stream.
 
1498
        
 
1499
        We read all bytes at once to ensure we've checked the trailer for 
 
1500
        errors, and then feed the buffer back as read_body_bytes is called.
 
1501
        """
 
1502
        if self._body_buffer is not None:
 
1503
            return self._body_buffer.read(count)
 
1504
        _body_decoder = LengthPrefixedBodyDecoder()
 
1505
 
 
1506
        while not _body_decoder.finished_reading:
 
1507
            bytes_wanted = _body_decoder.bytes_left
 
1508
            if bytes_wanted is None:
 
1509
                bytes_wanted = 1
 
1510
            bytes = self._request.read_bytes(bytes_wanted)
 
1511
            _body_decoder.accept_bytes(bytes)
 
1512
        self._request.finished_reading()
 
1513
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1514
        # XXX: TODO check the trailer result.
 
1515
        return self._body_buffer.read(count)
 
1516
 
 
1517
    def _recv_tuple(self):
 
1518
        """Receive a tuple from the medium request."""
 
1519
        line = ''
 
1520
        while not line or line[-1] != '\n':
 
1521
            # yes, this is inefficient - but tuples are short.
 
1522
            new_char = self._request.read_bytes(1)
 
1523
            line += new_char
 
1524
            assert new_char != '', "end of file reading from server."
 
1525
        return _decode_tuple(line)
 
1526
 
 
1527
    def query_version(self):
 
1528
        """Return protocol version number of the server."""
 
1529
        self.call('hello')
 
1530
        resp = self.read_response_tuple()
 
1531
        if resp == ('ok', '1'):
 
1532
            return 1
 
1533
        else:
 
1534
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1535
 
 
1536
 
 
1537
class SmartClientMedium(object):
 
1538
    """Smart client is a medium for sending smart protocol requests over."""
 
1539
 
 
1540
    def disconnect(self):
 
1541
        """If this medium maintains a persistent connection, close it.
 
1542
        
 
1543
        The default implementation does nothing.
 
1544
        """
 
1545
        
 
1546
 
 
1547
class SmartClientStreamMedium(SmartClientMedium):
 
1548
    """Stream based medium common class.
 
1549
 
 
1550
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1551
    SmartClientStreamMediumRequest for their requests, and should implement
 
1552
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1553
    receive bytes.
 
1554
    """
 
1555
 
 
1556
    def __init__(self):
 
1557
        self._current_request = None
 
1558
 
 
1559
    def accept_bytes(self, bytes):
 
1560
        self._accept_bytes(bytes)
 
1561
 
 
1562
    def __del__(self):
 
1563
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1564
        finished with it.
 
1565
        """
 
1566
        self.disconnect()
 
1567
 
 
1568
    def _flush(self):
 
1569
        """Flush the output stream.
 
1570
        
 
1571
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1572
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1573
        """
 
1574
        raise NotImplementedError(self._flush)
 
1575
 
 
1576
    def get_request(self):
 
1577
        """See SmartClientMedium.get_request().
 
1578
 
 
1579
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1580
        for get_request.
 
1581
        """
 
1582
        return SmartClientStreamMediumRequest(self)
 
1583
 
 
1584
    def read_bytes(self, count):
 
1585
        return self._read_bytes(count)
 
1586
 
 
1587
 
 
1588
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1589
    """A client medium using simple pipes.
 
1590
    
 
1591
    This client does not manage the pipes: it assumes they will always be open.
 
1592
    """
 
1593
 
 
1594
    def __init__(self, readable_pipe, writeable_pipe):
 
1595
        SmartClientStreamMedium.__init__(self)
 
1596
        self._readable_pipe = readable_pipe
 
1597
        self._writeable_pipe = writeable_pipe
 
1598
 
 
1599
    def _accept_bytes(self, bytes):
 
1600
        """See SmartClientStreamMedium.accept_bytes."""
 
1601
        self._writeable_pipe.write(bytes)
 
1602
 
 
1603
    def _flush(self):
 
1604
        """See SmartClientStreamMedium._flush()."""
 
1605
        self._writeable_pipe.flush()
 
1606
 
 
1607
    def _read_bytes(self, count):
 
1608
        """See SmartClientStreamMedium._read_bytes."""
 
1609
        return self._readable_pipe.read(count)
 
1610
 
 
1611
 
 
1612
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1613
    """A client medium using SSH."""
 
1614
    
 
1615
    def __init__(self, host, port=None, username=None, password=None,
 
1616
            vendor=None):
 
1617
        """Creates a client that will connect on the first use.
 
1618
        
 
1619
        :param vendor: An optional override for the ssh vendor to use. See
 
1620
            bzrlib.transport.ssh for details on ssh vendors.
 
1621
        """
 
1622
        SmartClientStreamMedium.__init__(self)
 
1623
        self._connected = False
 
1624
        self._host = host
 
1625
        self._password = password
 
1626
        self._port = port
 
1627
        self._username = username
 
1628
        self._read_from = None
 
1629
        self._ssh_connection = None
 
1630
        self._vendor = vendor
 
1631
        self._write_to = None
 
1632
 
 
1633
    def _accept_bytes(self, bytes):
 
1634
        """See SmartClientStreamMedium.accept_bytes."""
 
1635
        self._ensure_connection()
 
1636
        self._write_to.write(bytes)
 
1637
 
 
1638
    def disconnect(self):
 
1639
        """See SmartClientMedium.disconnect()."""
 
1640
        if not self._connected:
 
1641
            return
 
1642
        self._read_from.close()
 
1643
        self._write_to.close()
 
1644
        self._ssh_connection.close()
 
1645
        self._connected = False
 
1646
 
 
1647
    def _ensure_connection(self):
 
1648
        """Connect this medium if not already connected."""
 
1649
        if self._connected:
 
1650
            return
 
1651
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1652
        if self._vendor is None:
 
1653
            vendor = ssh._get_ssh_vendor()
 
1654
        else:
 
1655
            vendor = self._vendor
 
1656
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1657
                self._password, self._host, self._port,
 
1658
                command=[executable, 'serve', '--inet', '--directory=/',
 
1659
                         '--allow-writes'])
 
1660
        self._read_from, self._write_to = \
 
1661
            self._ssh_connection.get_filelike_channels()
 
1662
        self._connected = True
 
1663
 
 
1664
    def _flush(self):
 
1665
        """See SmartClientStreamMedium._flush()."""
 
1666
        self._write_to.flush()
 
1667
 
 
1668
    def _read_bytes(self, count):
 
1669
        """See SmartClientStreamMedium.read_bytes."""
 
1670
        if not self._connected:
 
1671
            raise errors.MediumNotConnected(self)
 
1672
        return self._read_from.read(count)
 
1673
 
 
1674
 
 
1675
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1676
    """A client medium using TCP."""
 
1677
    
 
1678
    def __init__(self, host, port):
 
1679
        """Creates a client that will connect on the first use."""
 
1680
        SmartClientStreamMedium.__init__(self)
 
1681
        self._connected = False
 
1682
        self._host = host
 
1683
        self._port = port
 
1684
        self._socket = None
 
1685
 
 
1686
    def _accept_bytes(self, bytes):
 
1687
        """See SmartClientMedium.accept_bytes."""
 
1688
        self._ensure_connection()
 
1689
        self._socket.sendall(bytes)
 
1690
 
 
1691
    def disconnect(self):
 
1692
        """See SmartClientMedium.disconnect()."""
 
1693
        if not self._connected:
 
1694
            return
 
1695
        self._socket.close()
 
1696
        self._socket = None
 
1697
        self._connected = False
 
1698
 
 
1699
    def _ensure_connection(self):
 
1700
        """Connect this medium if not already connected."""
 
1701
        if self._connected:
 
1702
            return
 
1703
        self._socket = socket.socket()
 
1704
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1705
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1706
        if result:
 
1707
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1708
                    (self._host, self._port, os.strerror(result)))
 
1709
        self._connected = True
 
1710
 
 
1711
    def _flush(self):
 
1712
        """See SmartClientStreamMedium._flush().
 
1713
        
 
1714
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1715
        add a means to do a flush, but that can be done in the future.
 
1716
        """
 
1717
 
 
1718
    def _read_bytes(self, count):
 
1719
        """See SmartClientMedium.read_bytes."""
 
1720
        if not self._connected:
 
1721
            raise errors.MediumNotConnected(self)
 
1722
        return self._socket.recv(count)
 
1723
 
 
1724
 
 
1725
class SmartTCPTransport(SmartTransport):
 
1726
    """Connection to smart server over plain tcp.
 
1727
    
 
1728
    This is essentially just a factory to get 'RemoteTransport(url,
 
1729
        SmartTCPClientMedium).
 
1730
    """
 
1731
 
 
1732
    def __init__(self, url):
 
1733
        _scheme, _username, _password, _host, _port, _path = \
 
1734
            transport.split_url(url)
 
1735
        try:
 
1736
            _port = int(_port)
 
1737
        except (ValueError, TypeError), e:
 
1738
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
 
1739
        medium = SmartTCPClientMedium(_host, _port)
 
1740
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1741
 
 
1742
 
 
1743
try:
 
1744
    from bzrlib.transport import ssh
 
1745
except errors.ParamikoNotPresent:
 
1746
    # no paramiko, no SSHTransport.
 
1747
    pass
 
1748
else:
 
1749
    class SmartSSHTransport(SmartTransport):
 
1750
        """Connection to smart server over SSH.
 
1751
 
 
1752
        This is essentially just a factory to get 'RemoteTransport(url,
 
1753
            SmartSSHClientMedium).
 
1754
        """
 
1755
 
 
1756
        def __init__(self, url):
 
1757
            _scheme, _username, _password, _host, _port, _path = \
 
1758
                transport.split_url(url)
 
1759
            try:
 
1760
                if _port is not None:
 
1761
                    _port = int(_port)
 
1762
            except (ValueError, TypeError), e:
 
1763
                raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1764
                    _port)
 
1765
            medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1766
            super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1767
 
 
1768
 
 
1769
def get_test_permutations():
 
1770
    """Return (transport, server) permutations for testing."""
 
1771
    ### We may need a little more test framework support to construct an
 
1772
    ### appropriate RemoteTransport in the future.
 
1773
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]