/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: bialix at ukr
  • Date: 2007-03-02 23:53:16 UTC
  • mto: This revision was merged to the branch mainline in revision 2318.
  • Revision ID: bialix@ukr.net-20070302235316-8n8d4osjeha5c4ge
Fix for selftest of man generator with instaled plugin 'htmllog' (no help for plugin's log formatter)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
 
51
that you have multiple requests and get a read error because the other side did
 
52
shutdown.  For pipes we have read pipe which will have a zero read which marks
 
53
end-of-file.  For HTTP server environment there is not end-of-stream because
 
54
each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out requests from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialization, deserialization)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialization, deserialization)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
109
"""
 
110
 
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
 
196
 
 
197
from cStringIO import StringIO
 
198
import os
 
199
import socket
 
200
import tempfile
 
201
import threading
 
202
import urllib
 
203
import urlparse
 
204
 
 
205
from bzrlib import (
 
206
    bzrdir,
 
207
    errors,
 
208
    revision,
 
209
    transport,
 
210
    trace,
 
211
    urlutils,
 
212
    )
 
213
from bzrlib.bundle.serializer import write_bundle
 
214
try:
 
215
    from bzrlib.transport import ssh
 
216
except errors.ParamikoNotPresent:
 
217
    # no paramiko.  SmartSSHClientMedium will break.
 
218
    pass
 
219
 
 
220
# must do this otherwise urllib can't parse the urls properly :(
 
221
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
 
222
    transport.register_urlparse_netloc_protocol(scheme)
 
223
del scheme
 
224
 
 
225
 
 
226
# Port 4155 is the default port for bzr://, registered with IANA.
 
227
BZR_DEFAULT_PORT = 4155
 
228
 
 
229
 
 
230
def _recv_tuple(from_file):
 
231
    req_line = from_file.readline()
 
232
    return _decode_tuple(req_line)
 
233
 
 
234
 
 
235
def _decode_tuple(req_line):
 
236
    if req_line == None or req_line == '':
 
237
        return None
 
238
    if req_line[-1] != '\n':
 
239
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
240
    return tuple(req_line[:-1].split('\x01'))
 
241
 
 
242
 
 
243
def _encode_tuple(args):
 
244
    """Encode the tuple args to a bytestream."""
 
245
    return '\x01'.join(args) + '\n'
 
246
 
 
247
 
 
248
class SmartProtocolBase(object):
 
249
    """Methods common to client and server"""
 
250
 
 
251
    # TODO: this only actually accomodates a single block; possibly should
 
252
    # support multiple chunks?
 
253
    def _encode_bulk_data(self, body):
 
254
        """Encode body as a bulk data chunk."""
 
255
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
256
 
 
257
    def _serialise_offsets(self, offsets):
 
258
        """Serialise a readv offset list."""
 
259
        txt = []
 
260
        for start, length in offsets:
 
261
            txt.append('%d,%d' % (start, length))
 
262
        return '\n'.join(txt)
 
263
        
 
264
 
 
265
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
266
    """Server-side encoding and decoding logic for smart version 1."""
 
267
    
 
268
    def __init__(self, backing_transport, write_func):
 
269
        self._backing_transport = backing_transport
 
270
        self.excess_buffer = ''
 
271
        self._finished = False
 
272
        self.in_buffer = ''
 
273
        self.has_dispatched = False
 
274
        self.request = None
 
275
        self._body_decoder = None
 
276
        self._write_func = write_func
 
277
 
 
278
    def accept_bytes(self, bytes):
 
279
        """Take bytes, and advance the internal state machine appropriately.
 
280
        
 
281
        :param bytes: must be a byte string
 
282
        """
 
283
        assert isinstance(bytes, str)
 
284
        self.in_buffer += bytes
 
285
        if not self.has_dispatched:
 
286
            if '\n' not in self.in_buffer:
 
287
                # no command line yet
 
288
                return
 
289
            self.has_dispatched = True
 
290
            try:
 
291
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
292
                first_line += '\n'
 
293
                req_args = _decode_tuple(first_line)
 
294
                self.request = SmartServerRequestHandler(
 
295
                    self._backing_transport)
 
296
                self.request.dispatch_command(req_args[0], req_args[1:])
 
297
                if self.request.finished_reading:
 
298
                    # trivial request
 
299
                    self.excess_buffer = self.in_buffer
 
300
                    self.in_buffer = ''
 
301
                    self._send_response(self.request.response.args,
 
302
                        self.request.response.body)
 
303
            except KeyboardInterrupt:
 
304
                raise
 
305
            except Exception, exception:
 
306
                # everything else: pass to client, flush, and quit
 
307
                self._send_response(('error', str(exception)))
 
308
                return
 
309
 
 
310
        if self.has_dispatched:
 
311
            if self._finished:
 
312
                # nothing to do.XXX: this routine should be a single state 
 
313
                # machine too.
 
314
                self.excess_buffer += self.in_buffer
 
315
                self.in_buffer = ''
 
316
                return
 
317
            if self._body_decoder is None:
 
318
                self._body_decoder = LengthPrefixedBodyDecoder()
 
319
            self._body_decoder.accept_bytes(self.in_buffer)
 
320
            self.in_buffer = self._body_decoder.unused_data
 
321
            body_data = self._body_decoder.read_pending_data()
 
322
            self.request.accept_body(body_data)
 
323
            if self._body_decoder.finished_reading:
 
324
                self.request.end_of_body()
 
325
                assert self.request.finished_reading, \
 
326
                    "no more body, request not finished"
 
327
            if self.request.response is not None:
 
328
                self._send_response(self.request.response.args,
 
329
                    self.request.response.body)
 
330
                self.excess_buffer = self.in_buffer
 
331
                self.in_buffer = ''
 
332
            else:
 
333
                assert not self.request.finished_reading, \
 
334
                    "no response and we have finished reading."
 
335
 
 
336
    def _send_response(self, args, body=None):
 
337
        """Send a smart server response down the output stream."""
 
338
        assert not self._finished, 'response already sent'
 
339
        self._finished = True
 
340
        self._write_func(_encode_tuple(args))
 
341
        if body is not None:
 
342
            assert isinstance(body, str), 'body must be a str'
 
343
            bytes = self._encode_bulk_data(body)
 
344
            self._write_func(bytes)
 
345
 
 
346
    def next_read_size(self):
 
347
        if self._finished:
 
348
            return 0
 
349
        if self._body_decoder is None:
 
350
            return 1
 
351
        else:
 
352
            return self._body_decoder.next_read_size()
 
353
 
 
354
 
 
355
class LengthPrefixedBodyDecoder(object):
 
356
    """Decodes the length-prefixed bulk data."""
 
357
    
 
358
    def __init__(self):
 
359
        self.bytes_left = None
 
360
        self.finished_reading = False
 
361
        self.unused_data = ''
 
362
        self.state_accept = self._state_accept_expecting_length
 
363
        self.state_read = self._state_read_no_data
 
364
        self._in_buffer = ''
 
365
        self._trailer_buffer = ''
 
366
    
 
367
    def accept_bytes(self, bytes):
 
368
        """Decode as much of bytes as possible.
 
369
 
 
370
        If 'bytes' contains too much data it will be appended to
 
371
        self.unused_data.
 
372
 
 
373
        finished_reading will be set when no more data is required.  Further
 
374
        data will be appended to self.unused_data.
 
375
        """
 
376
        # accept_bytes is allowed to change the state
 
377
        current_state = self.state_accept
 
378
        self.state_accept(bytes)
 
379
        while current_state != self.state_accept:
 
380
            current_state = self.state_accept
 
381
            self.state_accept('')
 
382
 
 
383
    def next_read_size(self):
 
384
        if self.bytes_left is not None:
 
385
            # Ideally we want to read all the remainder of the body and the
 
386
            # trailer in one go.
 
387
            return self.bytes_left + 5
 
388
        elif self.state_accept == self._state_accept_reading_trailer:
 
389
            # Just the trailer left
 
390
            return 5 - len(self._trailer_buffer)
 
391
        elif self.state_accept == self._state_accept_expecting_length:
 
392
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
393
            # 'done\n').
 
394
            return 6
 
395
        else:
 
396
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
397
            return 1
 
398
        
 
399
    def read_pending_data(self):
 
400
        """Return any pending data that has been decoded."""
 
401
        return self.state_read()
 
402
 
 
403
    def _state_accept_expecting_length(self, bytes):
 
404
        self._in_buffer += bytes
 
405
        pos = self._in_buffer.find('\n')
 
406
        if pos == -1:
 
407
            return
 
408
        self.bytes_left = int(self._in_buffer[:pos])
 
409
        self._in_buffer = self._in_buffer[pos+1:]
 
410
        self.bytes_left -= len(self._in_buffer)
 
411
        self.state_accept = self._state_accept_reading_body
 
412
        self.state_read = self._state_read_in_buffer
 
413
 
 
414
    def _state_accept_reading_body(self, bytes):
 
415
        self._in_buffer += bytes
 
416
        self.bytes_left -= len(bytes)
 
417
        if self.bytes_left <= 0:
 
418
            # Finished with body
 
419
            if self.bytes_left != 0:
 
420
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
421
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
422
            self.bytes_left = None
 
423
            self.state_accept = self._state_accept_reading_trailer
 
424
        
 
425
    def _state_accept_reading_trailer(self, bytes):
 
426
        self._trailer_buffer += bytes
 
427
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
428
        # a ProtocolViolation exception?
 
429
        if self._trailer_buffer.startswith('done\n'):
 
430
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
431
            self.state_accept = self._state_accept_reading_unused
 
432
            self.finished_reading = True
 
433
    
 
434
    def _state_accept_reading_unused(self, bytes):
 
435
        self.unused_data += bytes
 
436
 
 
437
    def _state_read_no_data(self):
 
438
        return ''
 
439
 
 
440
    def _state_read_in_buffer(self):
 
441
        result = self._in_buffer
 
442
        self._in_buffer = ''
 
443
        return result
 
444
 
 
445
 
 
446
class SmartServerStreamMedium(object):
 
447
    """Handles smart commands coming over a stream.
 
448
 
 
449
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
450
    in-process fifo for testing.
 
451
 
 
452
    One instance is created for each connected client; it can serve multiple
 
453
    requests in the lifetime of the connection.
 
454
 
 
455
    The server passes requests through to an underlying backing transport, 
 
456
    which will typically be a LocalTransport looking at the server's filesystem.
 
457
    """
 
458
 
 
459
    def __init__(self, backing_transport):
 
460
        """Construct new server.
 
461
 
 
462
        :param backing_transport: Transport for the directory served.
 
463
        """
 
464
        # backing_transport could be passed to serve instead of __init__
 
465
        self.backing_transport = backing_transport
 
466
        self.finished = False
 
467
 
 
468
    def serve(self):
 
469
        """Serve requests until the client disconnects."""
 
470
        # Keep a reference to stderr because the sys module's globals get set to
 
471
        # None during interpreter shutdown.
 
472
        from sys import stderr
 
473
        try:
 
474
            while not self.finished:
 
475
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
476
                                                         self._write_out)
 
477
                self._serve_one_request(protocol)
 
478
        except Exception, e:
 
479
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
480
            raise
 
481
 
 
482
    def _serve_one_request(self, protocol):
 
483
        """Read one request from input, process, send back a response.
 
484
        
 
485
        :param protocol: a SmartServerRequestProtocol.
 
486
        """
 
487
        try:
 
488
            self._serve_one_request_unguarded(protocol)
 
489
        except KeyboardInterrupt:
 
490
            raise
 
491
        except Exception, e:
 
492
            self.terminate_due_to_error()
 
493
 
 
494
    def terminate_due_to_error(self):
 
495
        """Called when an unhandled exception from the protocol occurs."""
 
496
        raise NotImplementedError(self.terminate_due_to_error)
 
497
 
 
498
 
 
499
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
500
 
 
501
    def __init__(self, sock, backing_transport):
 
502
        """Constructor.
 
503
 
 
504
        :param sock: the socket the server will read from.  It will be put
 
505
            into blocking mode.
 
506
        """
 
507
        SmartServerStreamMedium.__init__(self, backing_transport)
 
508
        self.push_back = ''
 
509
        sock.setblocking(True)
 
510
        self.socket = sock
 
511
 
 
512
    def _serve_one_request_unguarded(self, protocol):
 
513
        while protocol.next_read_size():
 
514
            if self.push_back:
 
515
                protocol.accept_bytes(self.push_back)
 
516
                self.push_back = ''
 
517
            else:
 
518
                bytes = self.socket.recv(4096)
 
519
                if bytes == '':
 
520
                    self.finished = True
 
521
                    return
 
522
                protocol.accept_bytes(bytes)
 
523
        
 
524
        self.push_back = protocol.excess_buffer
 
525
    
 
526
    def terminate_due_to_error(self):
 
527
        """Called when an unhandled exception from the protocol occurs."""
 
528
        # TODO: This should log to a server log file, but no such thing
 
529
        # exists yet.  Andrew Bennetts 2006-09-29.
 
530
        self.socket.close()
 
531
        self.finished = True
 
532
 
 
533
    def _write_out(self, bytes):
 
534
        self.socket.sendall(bytes)
 
535
 
 
536
 
 
537
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
538
 
 
539
    def __init__(self, in_file, out_file, backing_transport):
 
540
        """Construct new server.
 
541
 
 
542
        :param in_file: Python file from which requests can be read.
 
543
        :param out_file: Python file to write responses.
 
544
        :param backing_transport: Transport for the directory served.
 
545
        """
 
546
        SmartServerStreamMedium.__init__(self, backing_transport)
 
547
        self._in = in_file
 
548
        self._out = out_file
 
549
 
 
550
    def _serve_one_request_unguarded(self, protocol):
 
551
        while True:
 
552
            bytes_to_read = protocol.next_read_size()
 
553
            if bytes_to_read == 0:
 
554
                # Finished serving this request.
 
555
                self._out.flush()
 
556
                return
 
557
            bytes = self._in.read(bytes_to_read)
 
558
            if bytes == '':
 
559
                # Connection has been closed.
 
560
                self.finished = True
 
561
                self._out.flush()
 
562
                return
 
563
            protocol.accept_bytes(bytes)
 
564
 
 
565
    def terminate_due_to_error(self):
 
566
        # TODO: This should log to a server log file, but no such thing
 
567
        # exists yet.  Andrew Bennetts 2006-09-29.
 
568
        self._out.close()
 
569
        self.finished = True
 
570
 
 
571
    def _write_out(self, bytes):
 
572
        self._out.write(bytes)
 
573
 
 
574
 
 
575
class SmartServerResponse(object):
 
576
    """Response generated by SmartServerRequestHandler."""
 
577
 
 
578
    def __init__(self, args, body=None):
 
579
        self.args = args
 
580
        self.body = body
 
581
 
 
582
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
583
# for delivering the data for a request. This could be done with as the
 
584
# StreamServer, though that would create conflation between request and response
 
585
# which may be undesirable.
 
586
 
 
587
 
 
588
class SmartServerRequestHandler(object):
 
589
    """Protocol logic for smart server.
 
590
    
 
591
    This doesn't handle serialization at all, it just processes requests and
 
592
    creates responses.
 
593
    """
 
594
 
 
595
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
596
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
597
    # from the object protocol: we will want to tweak the wire protocol separate
 
598
    # from the object model, and ideally we will be able to do that without
 
599
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
600
    # just a Protocol subclass.
 
601
 
 
602
    # TODO: Better way of representing the body for commands that take it,
 
603
    # and allow it to be streamed into the server.
 
604
    
 
605
    def __init__(self, backing_transport):
 
606
        self._backing_transport = backing_transport
 
607
        self._converted_command = False
 
608
        self.finished_reading = False
 
609
        self._body_bytes = ''
 
610
        self.response = None
 
611
 
 
612
    def accept_body(self, bytes):
 
613
        """Accept body data.
 
614
 
 
615
        This should be overriden for each command that desired body data to
 
616
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
617
 
 
618
        The deserialisation into that format should be done in the Protocol
 
619
        object. Set self.desired_body_format to the format your method will
 
620
        handle.
 
621
        """
 
622
        # default fallback is to accumulate bytes.
 
623
        self._body_bytes += bytes
 
624
        
 
625
    def _end_of_body_handler(self):
 
626
        """An unimplemented end of body handler."""
 
627
        raise NotImplementedError(self._end_of_body_handler)
 
628
        
 
629
    def do_hello(self):
 
630
        """Answer a version request with my version."""
 
631
        return SmartServerResponse(('ok', '1'))
 
632
 
 
633
    def do_has(self, relpath):
 
634
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
635
        return SmartServerResponse((r,))
 
636
 
 
637
    def do_get(self, relpath):
 
638
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
639
        return SmartServerResponse(('ok',), backing_bytes)
 
640
 
 
641
    def _deserialise_optional_mode(self, mode):
 
642
        # XXX: FIXME this should be on the protocol object.
 
643
        if mode == '':
 
644
            return None
 
645
        else:
 
646
            return int(mode)
 
647
 
 
648
    def do_append(self, relpath, mode):
 
649
        self._converted_command = True
 
650
        self._relpath = relpath
 
651
        self._mode = self._deserialise_optional_mode(mode)
 
652
        self._end_of_body_handler = self._handle_do_append_end
 
653
    
 
654
    def _handle_do_append_end(self):
 
655
        old_length = self._backing_transport.append_bytes(
 
656
            self._relpath, self._body_bytes, self._mode)
 
657
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
658
 
 
659
    def do_delete(self, relpath):
 
660
        self._backing_transport.delete(relpath)
 
661
 
 
662
    def do_iter_files_recursive(self, relpath):
 
663
        transport = self._backing_transport.clone(relpath)
 
664
        filenames = transport.iter_files_recursive()
 
665
        return SmartServerResponse(('names',) + tuple(filenames))
 
666
 
 
667
    def do_list_dir(self, relpath):
 
668
        filenames = self._backing_transport.list_dir(relpath)
 
669
        return SmartServerResponse(('names',) + tuple(filenames))
 
670
 
 
671
    def do_mkdir(self, relpath, mode):
 
672
        self._backing_transport.mkdir(relpath,
 
673
                                      self._deserialise_optional_mode(mode))
 
674
 
 
675
    def do_move(self, rel_from, rel_to):
 
676
        self._backing_transport.move(rel_from, rel_to)
 
677
 
 
678
    def do_put(self, relpath, mode):
 
679
        self._converted_command = True
 
680
        self._relpath = relpath
 
681
        self._mode = self._deserialise_optional_mode(mode)
 
682
        self._end_of_body_handler = self._handle_do_put
 
683
 
 
684
    def _handle_do_put(self):
 
685
        self._backing_transport.put_bytes(self._relpath,
 
686
                self._body_bytes, self._mode)
 
687
        self.response = SmartServerResponse(('ok',))
 
688
 
 
689
    def _deserialise_offsets(self, text):
 
690
        # XXX: FIXME this should be on the protocol object.
 
691
        offsets = []
 
692
        for line in text.split('\n'):
 
693
            if not line:
 
694
                continue
 
695
            start, length = line.split(',')
 
696
            offsets.append((int(start), int(length)))
 
697
        return offsets
 
698
 
 
699
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
700
        self._converted_command = True
 
701
        self._end_of_body_handler = self._handle_put_non_atomic
 
702
        self._relpath = relpath
 
703
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
704
        self._mode = self._deserialise_optional_mode(mode)
 
705
        # a boolean would be nicer XXX
 
706
        self._create_parent = (create_parent == 'T')
 
707
 
 
708
    def _handle_put_non_atomic(self):
 
709
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
710
                self._body_bytes,
 
711
                mode=self._mode,
 
712
                create_parent_dir=self._create_parent,
 
713
                dir_mode=self._dir_mode)
 
714
        self.response = SmartServerResponse(('ok',))
 
715
 
 
716
    def do_readv(self, relpath):
 
717
        self._converted_command = True
 
718
        self._end_of_body_handler = self._handle_readv_offsets
 
719
        self._relpath = relpath
 
720
 
 
721
    def end_of_body(self):
 
722
        """No more body data will be received."""
 
723
        self._run_handler_code(self._end_of_body_handler, (), {})
 
724
        # cannot read after this.
 
725
        self.finished_reading = True
 
726
 
 
727
    def _handle_readv_offsets(self):
 
728
        """accept offsets for a readv request."""
 
729
        offsets = self._deserialise_offsets(self._body_bytes)
 
730
        backing_bytes = ''.join(bytes for offset, bytes in
 
731
            self._backing_transport.readv(self._relpath, offsets))
 
732
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
733
        
 
734
    def do_rename(self, rel_from, rel_to):
 
735
        self._backing_transport.rename(rel_from, rel_to)
 
736
 
 
737
    def do_rmdir(self, relpath):
 
738
        self._backing_transport.rmdir(relpath)
 
739
 
 
740
    def do_stat(self, relpath):
 
741
        stat = self._backing_transport.stat(relpath)
 
742
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
743
        
 
744
    def do_get_bundle(self, path, revision_id):
 
745
        # open transport relative to our base
 
746
        t = self._backing_transport.clone(path)
 
747
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
748
        repo = control.open_repository()
 
749
        tmpf = tempfile.TemporaryFile()
 
750
        base_revision = revision.NULL_REVISION
 
751
        write_bundle(repo, revision_id, base_revision, tmpf)
 
752
        tmpf.seek(0)
 
753
        return SmartServerResponse((), tmpf.read())
 
754
 
 
755
    def dispatch_command(self, cmd, args):
 
756
        """Deprecated compatibility method.""" # XXX XXX
 
757
        func = getattr(self, 'do_' + cmd, None)
 
758
        if func is None:
 
759
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
760
        self._run_handler_code(func, args, {})
 
761
 
 
762
    def _run_handler_code(self, callable, args, kwargs):
 
763
        """Run some handler specific code 'callable'.
 
764
 
 
765
        If a result is returned, it is considered to be the commands response,
 
766
        and finished_reading is set true, and its assigned to self.response.
 
767
 
 
768
        Any exceptions caught are translated and a response object created
 
769
        from them.
 
770
        """
 
771
        result = self._call_converting_errors(callable, args, kwargs)
 
772
        if result is not None:
 
773
            self.response = result
 
774
            self.finished_reading = True
 
775
        # handle unconverted commands
 
776
        if not self._converted_command:
 
777
            self.finished_reading = True
 
778
            if result is None:
 
779
                self.response = SmartServerResponse(('ok',))
 
780
 
 
781
    def _call_converting_errors(self, callable, args, kwargs):
 
782
        """Call callable converting errors to Response objects."""
 
783
        try:
 
784
            return callable(*args, **kwargs)
 
785
        except errors.NoSuchFile, e:
 
786
            return SmartServerResponse(('NoSuchFile', e.path))
 
787
        except errors.FileExists, e:
 
788
            return SmartServerResponse(('FileExists', e.path))
 
789
        except errors.DirectoryNotEmpty, e:
 
790
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
791
        except errors.ShortReadvError, e:
 
792
            return SmartServerResponse(('ShortReadvError',
 
793
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
794
        except UnicodeError, e:
 
795
            # If it is a DecodeError, than most likely we are starting
 
796
            # with a plain string
 
797
            str_or_unicode = e.object
 
798
            if isinstance(str_or_unicode, unicode):
 
799
                # XXX: UTF-8 might have \x01 (our seperator byte) in it.  We
 
800
                # should escape it somehow.
 
801
                val = 'u:' + str_or_unicode.encode('utf-8')
 
802
            else:
 
803
                val = 's:' + str_or_unicode.encode('base64')
 
804
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
805
            return SmartServerResponse((e.__class__.__name__,
 
806
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
807
        except errors.TransportNotPossible, e:
 
808
            if e.msg == "readonly transport":
 
809
                return SmartServerResponse(('ReadOnlyError', ))
 
810
            else:
 
811
                raise
 
812
 
 
813
 
 
814
class SmartTCPServer(object):
 
815
    """Listens on a TCP socket and accepts connections from smart clients"""
 
816
 
 
817
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
818
        """Construct a new server.
 
819
 
 
820
        To actually start it running, call either start_background_thread or
 
821
        serve.
 
822
 
 
823
        :param host: Name of the interface to listen on.
 
824
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
825
        """
 
826
        self._server_socket = socket.socket()
 
827
        self._server_socket.bind((host, port))
 
828
        self.port = self._server_socket.getsockname()[1]
 
829
        self._server_socket.listen(1)
 
830
        self._server_socket.settimeout(1)
 
831
        self.backing_transport = backing_transport
 
832
 
 
833
    def serve(self):
 
834
        # let connections timeout so that we get a chance to terminate
 
835
        # Keep a reference to the exceptions we want to catch because the socket
 
836
        # module's globals get set to None during interpreter shutdown.
 
837
        from socket import timeout as socket_timeout
 
838
        from socket import error as socket_error
 
839
        self._should_terminate = False
 
840
        while not self._should_terminate:
 
841
            try:
 
842
                self.accept_and_serve()
 
843
            except socket_timeout:
 
844
                # just check if we're asked to stop
 
845
                pass
 
846
            except socket_error, e:
 
847
                trace.warning("client disconnected: %s", e)
 
848
                pass
 
849
 
 
850
    def get_url(self):
 
851
        """Return the url of the server"""
 
852
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
853
 
 
854
    def accept_and_serve(self):
 
855
        conn, client_addr = self._server_socket.accept()
 
856
        # For WIN32, where the timeout value from the listening socket
 
857
        # propogates to the newly accepted socket.
 
858
        conn.setblocking(True)
 
859
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
860
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
 
861
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
862
        connection_thread.setDaemon(True)
 
863
        connection_thread.start()
 
864
 
 
865
    def start_background_thread(self):
 
866
        self._server_thread = threading.Thread(None,
 
867
                self.serve,
 
868
                name='server-' + self.get_url())
 
869
        self._server_thread.setDaemon(True)
 
870
        self._server_thread.start()
 
871
 
 
872
    def stop_background_thread(self):
 
873
        self._should_terminate = True
 
874
        # At one point we would wait to join the threads here, but it looks
 
875
        # like they don't actually exit.  So now we just leave them running
 
876
        # and expect to terminate the process. -- mbp 20070215
 
877
        # self._server_socket.close()
 
878
        ## sys.stderr.write("waiting for server thread to finish...")
 
879
        ## self._server_thread.join()
 
880
 
 
881
 
 
882
class SmartTCPServer_for_testing(SmartTCPServer):
 
883
    """Server suitable for use by transport tests.
 
884
    
 
885
    This server is backed by the process's cwd.
 
886
    """
 
887
 
 
888
    def __init__(self):
 
889
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
 
890
        # The server is set up by default like for ssh access: the client
 
891
        # passes filesystem-absolute paths; therefore the server must look
 
892
        # them up relative to the root directory.  it might be better to act
 
893
        # a public server and have the server rewrite paths into the test
 
894
        # directory.
 
895
        SmartTCPServer.__init__(self,
 
896
            transport.get_transport(urlutils.local_path_to_url('/')))
 
897
        
 
898
    def setUp(self):
 
899
        """Set up server for testing"""
 
900
        self.start_background_thread()
 
901
 
 
902
    def tearDown(self):
 
903
        self.stop_background_thread()
 
904
 
 
905
    def get_url(self):
 
906
        """Return the url of the server"""
 
907
        host, port = self._server_socket.getsockname()
 
908
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
909
 
 
910
    def get_bogus_url(self):
 
911
        """Return a URL which will fail to connect"""
 
912
        return 'bzr://127.0.0.1:1/'
 
913
 
 
914
 
 
915
class SmartStat(object):
 
916
 
 
917
    def __init__(self, size, mode):
 
918
        self.st_size = size
 
919
        self.st_mode = mode
 
920
 
 
921
 
 
922
class SmartTransport(transport.Transport):
 
923
    """Connection to a smart server.
 
924
 
 
925
    The connection holds references to pipes that can be used to send requests
 
926
    to the server.
 
927
 
 
928
    The connection has a notion of the current directory to which it's
 
929
    connected; this is incorporated in filenames passed to the server.
 
930
    
 
931
    This supports some higher-level RPC operations and can also be treated 
 
932
    like a Transport to do file-like operations.
 
933
 
 
934
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
935
    or a series of http requests.  There are concrete subclasses for each
 
936
    type: SmartTCPTransport, etc.
 
937
    """
 
938
 
 
939
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
940
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
941
    # the ability to support multiple versions of the smart protocol over time:
 
942
    # SmartTransport is an adapter from the Transport object model to the 
 
943
    # SmartClient model, not an encoder.
 
944
 
 
945
    def __init__(self, url, clone_from=None, medium=None):
 
946
        """Constructor.
 
947
 
 
948
        :param medium: The medium to use for this RemoteTransport. This must be
 
949
            supplied if clone_from is None.
 
950
        """
 
951
        ### Technically super() here is faulty because Transport's __init__
 
952
        ### fails to take 2 parameters, and if super were to choose a silly
 
953
        ### initialisation order things would blow up. 
 
954
        if not url.endswith('/'):
 
955
            url += '/'
 
956
        super(SmartTransport, self).__init__(url)
 
957
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
958
                transport.split_url(url)
 
959
        if clone_from is None:
 
960
            self._medium = medium
 
961
        else:
 
962
            # credentials may be stripped from the base in some circumstances
 
963
            # as yet to be clearly defined or documented, so copy them.
 
964
            self._username = clone_from._username
 
965
            # reuse same connection
 
966
            self._medium = clone_from._medium
 
967
        assert self._medium is not None
 
968
 
 
969
    def abspath(self, relpath):
 
970
        """Return the full url to the given relative path.
 
971
        
 
972
        @param relpath: the relative path or path components
 
973
        @type relpath: str or list
 
974
        """
 
975
        return self._unparse_url(self._remote_path(relpath))
 
976
    
 
977
    def clone(self, relative_url):
 
978
        """Make a new SmartTransport related to me, sharing the same connection.
 
979
 
 
980
        This essentially opens a handle on a different remote directory.
 
981
        """
 
982
        if relative_url is None:
 
983
            return SmartTransport(self.base, self)
 
984
        else:
 
985
            return SmartTransport(self.abspath(relative_url), self)
 
986
 
 
987
    def is_readonly(self):
 
988
        """Smart server transport can do read/write file operations."""
 
989
        return False
 
990
                                                   
 
991
    def get_smart_client(self):
 
992
        return self._medium
 
993
 
 
994
    def get_smart_medium(self):
 
995
        return self._medium
 
996
                                                   
 
997
    def _unparse_url(self, path):
 
998
        """Return URL for a path.
 
999
 
 
1000
        :see: SFTPUrlHandling._unparse_url
 
1001
        """
 
1002
        # TODO: Eventually it should be possible to unify this with
 
1003
        # SFTPUrlHandling._unparse_url?
 
1004
        if path == '':
 
1005
            path = '/'
 
1006
        path = urllib.quote(path)
 
1007
        netloc = urllib.quote(self._host)
 
1008
        if self._username is not None:
 
1009
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
1010
        if self._port is not None:
 
1011
            netloc = '%s:%d' % (netloc, self._port)
 
1012
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
1013
 
 
1014
    def _remote_path(self, relpath):
 
1015
        """Returns the Unicode version of the absolute path for relpath."""
 
1016
        return self._combine_paths(self._path, relpath)
 
1017
 
 
1018
    def _call(self, method, *args):
 
1019
        resp = self._call2(method, *args)
 
1020
        self._translate_error(resp)
 
1021
 
 
1022
    def _call2(self, method, *args):
 
1023
        """Call a method on the remote server."""
 
1024
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1025
        protocol.call(method, *args)
 
1026
        return protocol.read_response_tuple()
 
1027
 
 
1028
    def _call_with_body_bytes(self, method, args, body):
 
1029
        """Call a method on the remote server with body bytes."""
 
1030
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1031
        protocol.call_with_body_bytes((method, ) + args, body)
 
1032
        return protocol.read_response_tuple()
 
1033
 
 
1034
    def has(self, relpath):
 
1035
        """Indicate whether a remote file of the given name exists or not.
 
1036
 
 
1037
        :see: Transport.has()
 
1038
        """
 
1039
        resp = self._call2('has', self._remote_path(relpath))
 
1040
        if resp == ('yes', ):
 
1041
            return True
 
1042
        elif resp == ('no', ):
 
1043
            return False
 
1044
        else:
 
1045
            self._translate_error(resp)
 
1046
 
 
1047
    def get(self, relpath):
 
1048
        """Return file-like object reading the contents of a remote file.
 
1049
        
 
1050
        :see: Transport.get_bytes()/get_file()
 
1051
        """
 
1052
        return StringIO(self.get_bytes(relpath))
 
1053
 
 
1054
    def get_bytes(self, relpath):
 
1055
        remote = self._remote_path(relpath)
 
1056
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1057
        protocol.call('get', remote)
 
1058
        resp = protocol.read_response_tuple(True)
 
1059
        if resp != ('ok', ):
 
1060
            protocol.cancel_read_body()
 
1061
            self._translate_error(resp, relpath)
 
1062
        return protocol.read_body_bytes()
 
1063
 
 
1064
    def _serialise_optional_mode(self, mode):
 
1065
        if mode is None:
 
1066
            return ''
 
1067
        else:
 
1068
            return '%d' % mode
 
1069
 
 
1070
    def mkdir(self, relpath, mode=None):
 
1071
        resp = self._call2('mkdir', self._remote_path(relpath),
 
1072
            self._serialise_optional_mode(mode))
 
1073
        self._translate_error(resp)
 
1074
 
 
1075
    def put_bytes(self, relpath, upload_contents, mode=None):
 
1076
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
1077
        # should probably just pass all parameters as length-delimited
 
1078
        # strings?
 
1079
        resp = self._call_with_body_bytes('put',
 
1080
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1081
            upload_contents)
 
1082
        self._translate_error(resp)
 
1083
 
 
1084
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
1085
                             create_parent_dir=False,
 
1086
                             dir_mode=None):
 
1087
        """See Transport.put_bytes_non_atomic."""
 
1088
        # FIXME: no encoding in the transport!
 
1089
        create_parent_str = 'F'
 
1090
        if create_parent_dir:
 
1091
            create_parent_str = 'T'
 
1092
 
 
1093
        resp = self._call_with_body_bytes(
 
1094
            'put_non_atomic',
 
1095
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
1096
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
1097
            bytes)
 
1098
        self._translate_error(resp)
 
1099
 
 
1100
    def put_file(self, relpath, upload_file, mode=None):
 
1101
        # its not ideal to seek back, but currently put_non_atomic_file depends
 
1102
        # on transports not reading before failing - which is a faulty
 
1103
        # assumption I think - RBC 20060915
 
1104
        pos = upload_file.tell()
 
1105
        try:
 
1106
            return self.put_bytes(relpath, upload_file.read(), mode)
 
1107
        except:
 
1108
            upload_file.seek(pos)
 
1109
            raise
 
1110
 
 
1111
    def put_file_non_atomic(self, relpath, f, mode=None,
 
1112
                            create_parent_dir=False,
 
1113
                            dir_mode=None):
 
1114
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
1115
                                         create_parent_dir=create_parent_dir,
 
1116
                                         dir_mode=dir_mode)
 
1117
 
 
1118
    def append_file(self, relpath, from_file, mode=None):
 
1119
        return self.append_bytes(relpath, from_file.read(), mode)
 
1120
        
 
1121
    def append_bytes(self, relpath, bytes, mode=None):
 
1122
        resp = self._call_with_body_bytes(
 
1123
            'append',
 
1124
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1125
            bytes)
 
1126
        if resp[0] == 'appended':
 
1127
            return int(resp[1])
 
1128
        self._translate_error(resp)
 
1129
 
 
1130
    def delete(self, relpath):
 
1131
        resp = self._call2('delete', self._remote_path(relpath))
 
1132
        self._translate_error(resp)
 
1133
 
 
1134
    def readv(self, relpath, offsets):
 
1135
        if not offsets:
 
1136
            return
 
1137
 
 
1138
        offsets = list(offsets)
 
1139
 
 
1140
        sorted_offsets = sorted(offsets)
 
1141
        # turn the list of offsets into a stack
 
1142
        offset_stack = iter(offsets)
 
1143
        cur_offset_and_size = offset_stack.next()
 
1144
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
1145
                               limit=self._max_readv_combine,
 
1146
                               fudge_factor=self._bytes_to_read_before_seek))
 
1147
 
 
1148
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1149
        protocol.call_with_body_readv_array(
 
1150
            ('readv', self._remote_path(relpath)),
 
1151
            [(c.start, c.length) for c in coalesced])
 
1152
        resp = protocol.read_response_tuple(True)
 
1153
 
 
1154
        if resp[0] != 'readv':
 
1155
            # This should raise an exception
 
1156
            protocol.cancel_read_body()
 
1157
            self._translate_error(resp)
 
1158
            return
 
1159
 
 
1160
        # FIXME: this should know how many bytes are needed, for clarity.
 
1161
        data = protocol.read_body_bytes()
 
1162
        # Cache the results, but only until they have been fulfilled
 
1163
        data_map = {}
 
1164
        for c_offset in coalesced:
 
1165
            if len(data) < c_offset.length:
 
1166
                raise errors.ShortReadvError(relpath, c_offset.start,
 
1167
                            c_offset.length, actual=len(data))
 
1168
            for suboffset, subsize in c_offset.ranges:
 
1169
                key = (c_offset.start+suboffset, subsize)
 
1170
                data_map[key] = data[suboffset:suboffset+subsize]
 
1171
            data = data[c_offset.length:]
 
1172
 
 
1173
            # Now that we've read some data, see if we can yield anything back
 
1174
            while cur_offset_and_size in data_map:
 
1175
                this_data = data_map.pop(cur_offset_and_size)
 
1176
                yield cur_offset_and_size[0], this_data
 
1177
                cur_offset_and_size = offset_stack.next()
 
1178
 
 
1179
    def rename(self, rel_from, rel_to):
 
1180
        self._call('rename',
 
1181
                   self._remote_path(rel_from),
 
1182
                   self._remote_path(rel_to))
 
1183
 
 
1184
    def move(self, rel_from, rel_to):
 
1185
        self._call('move',
 
1186
                   self._remote_path(rel_from),
 
1187
                   self._remote_path(rel_to))
 
1188
 
 
1189
    def rmdir(self, relpath):
 
1190
        resp = self._call('rmdir', self._remote_path(relpath))
 
1191
 
 
1192
    def _translate_error(self, resp, orig_path=None):
 
1193
        """Raise an exception from a response"""
 
1194
        if resp is None:
 
1195
            what = None
 
1196
        else:
 
1197
            what = resp[0]
 
1198
        if what == 'ok':
 
1199
            return
 
1200
        elif what == 'NoSuchFile':
 
1201
            if orig_path is not None:
 
1202
                error_path = orig_path
 
1203
            else:
 
1204
                error_path = resp[1]
 
1205
            raise errors.NoSuchFile(error_path)
 
1206
        elif what == 'error':
 
1207
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1208
        elif what == 'FileExists':
 
1209
            raise errors.FileExists(resp[1])
 
1210
        elif what == 'DirectoryNotEmpty':
 
1211
            raise errors.DirectoryNotEmpty(resp[1])
 
1212
        elif what == 'ShortReadvError':
 
1213
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1214
                                         int(resp[3]), int(resp[4]))
 
1215
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1216
            encoding = str(resp[1]) # encoding must always be a string
 
1217
            val = resp[2]
 
1218
            start = int(resp[3])
 
1219
            end = int(resp[4])
 
1220
            reason = str(resp[5]) # reason must always be a string
 
1221
            if val.startswith('u:'):
 
1222
                val = val[2:].decode('utf-8')
 
1223
            elif val.startswith('s:'):
 
1224
                val = val[2:].decode('base64')
 
1225
            if what == 'UnicodeDecodeError':
 
1226
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1227
            elif what == 'UnicodeEncodeError':
 
1228
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1229
        elif what == "ReadOnlyError":
 
1230
            raise errors.TransportNotPossible('readonly transport')
 
1231
        else:
 
1232
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
1233
 
 
1234
    def disconnect(self):
 
1235
        self._medium.disconnect()
 
1236
 
 
1237
    def delete_tree(self, relpath):
 
1238
        raise errors.TransportNotPossible('readonly transport')
 
1239
 
 
1240
    def stat(self, relpath):
 
1241
        resp = self._call2('stat', self._remote_path(relpath))
 
1242
        if resp[0] == 'stat':
 
1243
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
1244
        else:
 
1245
            self._translate_error(resp)
 
1246
 
 
1247
    ## def lock_read(self, relpath):
 
1248
    ##     """Lock the given file for shared (read) access.
 
1249
    ##     :return: A lock object, which should be passed to Transport.unlock()
 
1250
    ##     """
 
1251
    ##     # The old RemoteBranch ignore lock for reading, so we will
 
1252
    ##     # continue that tradition and return a bogus lock object.
 
1253
    ##     class BogusLock(object):
 
1254
    ##         def __init__(self, path):
 
1255
    ##             self.path = path
 
1256
    ##         def unlock(self):
 
1257
    ##             pass
 
1258
    ##     return BogusLock(relpath)
 
1259
 
 
1260
    def listable(self):
 
1261
        return True
 
1262
 
 
1263
    def list_dir(self, relpath):
 
1264
        resp = self._call2('list_dir', self._remote_path(relpath))
 
1265
        if resp[0] == 'names':
 
1266
            return [name.encode('ascii') for name in resp[1:]]
 
1267
        else:
 
1268
            self._translate_error(resp)
 
1269
 
 
1270
    def iter_files_recursive(self):
 
1271
        resp = self._call2('iter_files_recursive', self._remote_path(''))
 
1272
        if resp[0] == 'names':
 
1273
            return resp[1:]
 
1274
        else:
 
1275
            self._translate_error(resp)
 
1276
 
 
1277
 
 
1278
class SmartClientMediumRequest(object):
 
1279
    """A request on a SmartClientMedium.
 
1280
 
 
1281
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1282
    the response bytes to be read via read_bytes.
 
1283
 
 
1284
    For instance:
 
1285
    request.accept_bytes('123')
 
1286
    request.finished_writing()
 
1287
    result = request.read_bytes(3)
 
1288
    request.finished_reading()
 
1289
 
 
1290
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1291
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1292
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1293
    details on concurrency and pipelining.
 
1294
    """
 
1295
 
 
1296
    def __init__(self, medium):
 
1297
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1298
        self._medium = medium
 
1299
        # we track state by constants - we may want to use the same
 
1300
        # pattern as BodyReader if it gets more complex.
 
1301
        # valid states are: "writing", "reading", "done"
 
1302
        self._state = "writing"
 
1303
 
 
1304
    def accept_bytes(self, bytes):
 
1305
        """Accept bytes for inclusion in this request.
 
1306
 
 
1307
        This method may not be be called after finished_writing() has been
 
1308
        called.  It depends upon the Medium whether or not the bytes will be
 
1309
        immediately transmitted. Message based Mediums will tend to buffer the
 
1310
        bytes until finished_writing() is called.
 
1311
 
 
1312
        :param bytes: A bytestring.
 
1313
        """
 
1314
        if self._state != "writing":
 
1315
            raise errors.WritingCompleted(self)
 
1316
        self._accept_bytes(bytes)
 
1317
 
 
1318
    def _accept_bytes(self, bytes):
 
1319
        """Helper for accept_bytes.
 
1320
 
 
1321
        Accept_bytes checks the state of the request to determing if bytes
 
1322
        should be accepted. After that it hands off to _accept_bytes to do the
 
1323
        actual acceptance.
 
1324
        """
 
1325
        raise NotImplementedError(self._accept_bytes)
 
1326
 
 
1327
    def finished_reading(self):
 
1328
        """Inform the request that all desired data has been read.
 
1329
 
 
1330
        This will remove the request from the pipeline for its medium (if the
 
1331
        medium supports pipelining) and any further calls to methods on the
 
1332
        request will raise ReadingCompleted.
 
1333
        """
 
1334
        if self._state == "writing":
 
1335
            raise errors.WritingNotComplete(self)
 
1336
        if self._state != "reading":
 
1337
            raise errors.ReadingCompleted(self)
 
1338
        self._state = "done"
 
1339
        self._finished_reading()
 
1340
 
 
1341
    def _finished_reading(self):
 
1342
        """Helper for finished_reading.
 
1343
 
 
1344
        finished_reading checks the state of the request to determine if 
 
1345
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1346
        to perform the action.
 
1347
        """
 
1348
        raise NotImplementedError(self._finished_reading)
 
1349
 
 
1350
    def finished_writing(self):
 
1351
        """Finish the writing phase of this request.
 
1352
 
 
1353
        This will flush all pending data for this request along the medium.
 
1354
        After calling finished_writing, you may not call accept_bytes anymore.
 
1355
        """
 
1356
        if self._state != "writing":
 
1357
            raise errors.WritingCompleted(self)
 
1358
        self._state = "reading"
 
1359
        self._finished_writing()
 
1360
 
 
1361
    def _finished_writing(self):
 
1362
        """Helper for finished_writing.
 
1363
 
 
1364
        finished_writing checks the state of the request to determine if 
 
1365
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1366
        to perform the action.
 
1367
        """
 
1368
        raise NotImplementedError(self._finished_writing)
 
1369
 
 
1370
    def read_bytes(self, count):
 
1371
        """Read bytes from this requests response.
 
1372
 
 
1373
        This method will block and wait for count bytes to be read. It may not
 
1374
        be invoked until finished_writing() has been called - this is to ensure
 
1375
        a message-based approach to requests, for compatability with message
 
1376
        based mediums like HTTP.
 
1377
        """
 
1378
        if self._state == "writing":
 
1379
            raise errors.WritingNotComplete(self)
 
1380
        if self._state != "reading":
 
1381
            raise errors.ReadingCompleted(self)
 
1382
        return self._read_bytes(count)
 
1383
 
 
1384
    def _read_bytes(self, count):
 
1385
        """Helper for read_bytes.
 
1386
 
 
1387
        read_bytes checks the state of the request to determing if bytes
 
1388
        should be read. After that it hands off to _read_bytes to do the
 
1389
        actual read.
 
1390
        """
 
1391
        raise NotImplementedError(self._read_bytes)
 
1392
 
 
1393
 
 
1394
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1395
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1396
 
 
1397
    def __init__(self, medium):
 
1398
        SmartClientMediumRequest.__init__(self, medium)
 
1399
        # check that we are safe concurrency wise. If some streams start
 
1400
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1401
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1402
        # and the setting/unsetting of _current_request likewise moved into
 
1403
        # that class : but its unneeded overhead for now. RBC 20060922
 
1404
        if self._medium._current_request is not None:
 
1405
            raise errors.TooManyConcurrentRequests(self._medium)
 
1406
        self._medium._current_request = self
 
1407
 
 
1408
    def _accept_bytes(self, bytes):
 
1409
        """See SmartClientMediumRequest._accept_bytes.
 
1410
        
 
1411
        This forwards to self._medium._accept_bytes because we are operating
 
1412
        on the mediums stream.
 
1413
        """
 
1414
        self._medium._accept_bytes(bytes)
 
1415
 
 
1416
    def _finished_reading(self):
 
1417
        """See SmartClientMediumRequest._finished_reading.
 
1418
 
 
1419
        This clears the _current_request on self._medium to allow a new 
 
1420
        request to be created.
 
1421
        """
 
1422
        assert self._medium._current_request is self
 
1423
        self._medium._current_request = None
 
1424
        
 
1425
    def _finished_writing(self):
 
1426
        """See SmartClientMediumRequest._finished_writing.
 
1427
 
 
1428
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1429
        """
 
1430
        self._medium._flush()
 
1431
 
 
1432
    def _read_bytes(self, count):
 
1433
        """See SmartClientMediumRequest._read_bytes.
 
1434
        
 
1435
        This forwards to self._medium._read_bytes because we are operating
 
1436
        on the mediums stream.
 
1437
        """
 
1438
        return self._medium._read_bytes(count)
 
1439
 
 
1440
 
 
1441
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1442
    """The client-side protocol for smart version 1."""
 
1443
 
 
1444
    def __init__(self, request):
 
1445
        """Construct a SmartClientRequestProtocolOne.
 
1446
 
 
1447
        :param request: A SmartClientMediumRequest to serialise onto and
 
1448
            deserialise from.
 
1449
        """
 
1450
        self._request = request
 
1451
        self._body_buffer = None
 
1452
 
 
1453
    def call(self, *args):
 
1454
        bytes = _encode_tuple(args)
 
1455
        self._request.accept_bytes(bytes)
 
1456
        self._request.finished_writing()
 
1457
 
 
1458
    def call_with_body_bytes(self, args, body):
 
1459
        """Make a remote call of args with body bytes 'body'.
 
1460
 
 
1461
        After calling this, call read_response_tuple to find the result out.
 
1462
        """
 
1463
        bytes = _encode_tuple(args)
 
1464
        self._request.accept_bytes(bytes)
 
1465
        bytes = self._encode_bulk_data(body)
 
1466
        self._request.accept_bytes(bytes)
 
1467
        self._request.finished_writing()
 
1468
 
 
1469
    def call_with_body_readv_array(self, args, body):
 
1470
        """Make a remote call with a readv array.
 
1471
 
 
1472
        The body is encoded with one line per readv offset pair. The numbers in
 
1473
        each pair are separated by a comma, and no trailing \n is emitted.
 
1474
        """
 
1475
        bytes = _encode_tuple(args)
 
1476
        self._request.accept_bytes(bytes)
 
1477
        readv_bytes = self._serialise_offsets(body)
 
1478
        bytes = self._encode_bulk_data(readv_bytes)
 
1479
        self._request.accept_bytes(bytes)
 
1480
        self._request.finished_writing()
 
1481
 
 
1482
    def cancel_read_body(self):
 
1483
        """After expecting a body, a response code may indicate one otherwise.
 
1484
 
 
1485
        This method lets the domain client inform the protocol that no body
 
1486
        will be transmitted. This is a terminal method: after calling it the
 
1487
        protocol is not able to be used further.
 
1488
        """
 
1489
        self._request.finished_reading()
 
1490
 
 
1491
    def read_response_tuple(self, expect_body=False):
 
1492
        """Read a response tuple from the wire.
 
1493
 
 
1494
        This should only be called once.
 
1495
        """
 
1496
        result = self._recv_tuple()
 
1497
        if not expect_body:
 
1498
            self._request.finished_reading()
 
1499
        return result
 
1500
 
 
1501
    def read_body_bytes(self, count=-1):
 
1502
        """Read bytes from the body, decoding into a byte stream.
 
1503
        
 
1504
        We read all bytes at once to ensure we've checked the trailer for 
 
1505
        errors, and then feed the buffer back as read_body_bytes is called.
 
1506
        """
 
1507
        if self._body_buffer is not None:
 
1508
            return self._body_buffer.read(count)
 
1509
        _body_decoder = LengthPrefixedBodyDecoder()
 
1510
 
 
1511
        while not _body_decoder.finished_reading:
 
1512
            bytes_wanted = _body_decoder.next_read_size()
 
1513
            bytes = self._request.read_bytes(bytes_wanted)
 
1514
            _body_decoder.accept_bytes(bytes)
 
1515
        self._request.finished_reading()
 
1516
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1517
        # XXX: TODO check the trailer result.
 
1518
        return self._body_buffer.read(count)
 
1519
 
 
1520
    def _recv_tuple(self):
 
1521
        """Receive a tuple from the medium request."""
 
1522
        line = ''
 
1523
        while not line or line[-1] != '\n':
 
1524
            # TODO: this is inefficient - but tuples are short.
 
1525
            new_char = self._request.read_bytes(1)
 
1526
            line += new_char
 
1527
            assert new_char != '', "end of file reading from server."
 
1528
        return _decode_tuple(line)
 
1529
 
 
1530
    def query_version(self):
 
1531
        """Return protocol version number of the server."""
 
1532
        self.call('hello')
 
1533
        resp = self.read_response_tuple()
 
1534
        if resp == ('ok', '1'):
 
1535
            return 1
 
1536
        else:
 
1537
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1538
 
 
1539
 
 
1540
class SmartClientMedium(object):
 
1541
    """Smart client is a medium for sending smart protocol requests over."""
 
1542
 
 
1543
    def disconnect(self):
 
1544
        """If this medium maintains a persistent connection, close it.
 
1545
        
 
1546
        The default implementation does nothing.
 
1547
        """
 
1548
        
 
1549
 
 
1550
class SmartClientStreamMedium(SmartClientMedium):
 
1551
    """Stream based medium common class.
 
1552
 
 
1553
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1554
    SmartClientStreamMediumRequest for their requests, and should implement
 
1555
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1556
    receive bytes.
 
1557
    """
 
1558
 
 
1559
    def __init__(self):
 
1560
        self._current_request = None
 
1561
 
 
1562
    def accept_bytes(self, bytes):
 
1563
        self._accept_bytes(bytes)
 
1564
 
 
1565
    def __del__(self):
 
1566
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1567
        finished with it.
 
1568
        """
 
1569
        self.disconnect()
 
1570
 
 
1571
    def _flush(self):
 
1572
        """Flush the output stream.
 
1573
        
 
1574
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1575
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1576
        """
 
1577
        raise NotImplementedError(self._flush)
 
1578
 
 
1579
    def get_request(self):
 
1580
        """See SmartClientMedium.get_request().
 
1581
 
 
1582
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1583
        for get_request.
 
1584
        """
 
1585
        return SmartClientStreamMediumRequest(self)
 
1586
 
 
1587
    def read_bytes(self, count):
 
1588
        return self._read_bytes(count)
 
1589
 
 
1590
 
 
1591
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1592
    """A client medium using simple pipes.
 
1593
    
 
1594
    This client does not manage the pipes: it assumes they will always be open.
 
1595
    """
 
1596
 
 
1597
    def __init__(self, readable_pipe, writeable_pipe):
 
1598
        SmartClientStreamMedium.__init__(self)
 
1599
        self._readable_pipe = readable_pipe
 
1600
        self._writeable_pipe = writeable_pipe
 
1601
 
 
1602
    def _accept_bytes(self, bytes):
 
1603
        """See SmartClientStreamMedium.accept_bytes."""
 
1604
        self._writeable_pipe.write(bytes)
 
1605
 
 
1606
    def _flush(self):
 
1607
        """See SmartClientStreamMedium._flush()."""
 
1608
        self._writeable_pipe.flush()
 
1609
 
 
1610
    def _read_bytes(self, count):
 
1611
        """See SmartClientStreamMedium._read_bytes."""
 
1612
        return self._readable_pipe.read(count)
 
1613
 
 
1614
 
 
1615
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1616
    """A client medium using SSH."""
 
1617
    
 
1618
    def __init__(self, host, port=None, username=None, password=None,
 
1619
            vendor=None):
 
1620
        """Creates a client that will connect on the first use.
 
1621
        
 
1622
        :param vendor: An optional override for the ssh vendor to use. See
 
1623
            bzrlib.transport.ssh for details on ssh vendors.
 
1624
        """
 
1625
        SmartClientStreamMedium.__init__(self)
 
1626
        self._connected = False
 
1627
        self._host = host
 
1628
        self._password = password
 
1629
        self._port = port
 
1630
        self._username = username
 
1631
        self._read_from = None
 
1632
        self._ssh_connection = None
 
1633
        self._vendor = vendor
 
1634
        self._write_to = None
 
1635
 
 
1636
    def _accept_bytes(self, bytes):
 
1637
        """See SmartClientStreamMedium.accept_bytes."""
 
1638
        self._ensure_connection()
 
1639
        self._write_to.write(bytes)
 
1640
 
 
1641
    def disconnect(self):
 
1642
        """See SmartClientMedium.disconnect()."""
 
1643
        if not self._connected:
 
1644
            return
 
1645
        self._read_from.close()
 
1646
        self._write_to.close()
 
1647
        self._ssh_connection.close()
 
1648
        self._connected = False
 
1649
 
 
1650
    def _ensure_connection(self):
 
1651
        """Connect this medium if not already connected."""
 
1652
        if self._connected:
 
1653
            return
 
1654
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1655
        if self._vendor is None:
 
1656
            vendor = ssh._get_ssh_vendor()
 
1657
        else:
 
1658
            vendor = self._vendor
 
1659
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1660
                self._password, self._host, self._port,
 
1661
                command=[executable, 'serve', '--inet', '--directory=/',
 
1662
                         '--allow-writes'])
 
1663
        self._read_from, self._write_to = \
 
1664
            self._ssh_connection.get_filelike_channels()
 
1665
        self._connected = True
 
1666
 
 
1667
    def _flush(self):
 
1668
        """See SmartClientStreamMedium._flush()."""
 
1669
        self._write_to.flush()
 
1670
 
 
1671
    def _read_bytes(self, count):
 
1672
        """See SmartClientStreamMedium.read_bytes."""
 
1673
        if not self._connected:
 
1674
            raise errors.MediumNotConnected(self)
 
1675
        return self._read_from.read(count)
 
1676
 
 
1677
 
 
1678
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1679
    """A client medium using TCP."""
 
1680
    
 
1681
    def __init__(self, host, port):
 
1682
        """Creates a client that will connect on the first use."""
 
1683
        SmartClientStreamMedium.__init__(self)
 
1684
        self._connected = False
 
1685
        self._host = host
 
1686
        self._port = port
 
1687
        self._socket = None
 
1688
 
 
1689
    def _accept_bytes(self, bytes):
 
1690
        """See SmartClientMedium.accept_bytes."""
 
1691
        self._ensure_connection()
 
1692
        self._socket.sendall(bytes)
 
1693
 
 
1694
    def disconnect(self):
 
1695
        """See SmartClientMedium.disconnect()."""
 
1696
        if not self._connected:
 
1697
            return
 
1698
        self._socket.close()
 
1699
        self._socket = None
 
1700
        self._connected = False
 
1701
 
 
1702
    def _ensure_connection(self):
 
1703
        """Connect this medium if not already connected."""
 
1704
        if self._connected:
 
1705
            return
 
1706
        self._socket = socket.socket()
 
1707
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1708
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1709
        if result:
 
1710
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1711
                    (self._host, self._port, os.strerror(result)))
 
1712
        self._connected = True
 
1713
 
 
1714
    def _flush(self):
 
1715
        """See SmartClientStreamMedium._flush().
 
1716
        
 
1717
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1718
        add a means to do a flush, but that can be done in the future.
 
1719
        """
 
1720
 
 
1721
    def _read_bytes(self, count):
 
1722
        """See SmartClientMedium.read_bytes."""
 
1723
        if not self._connected:
 
1724
            raise errors.MediumNotConnected(self)
 
1725
        return self._socket.recv(count)
 
1726
 
 
1727
 
 
1728
class SmartTCPTransport(SmartTransport):
 
1729
    """Connection to smart server over plain tcp.
 
1730
    
 
1731
    This is essentially just a factory to get 'RemoteTransport(url,
 
1732
        SmartTCPClientMedium).
 
1733
    """
 
1734
 
 
1735
    def __init__(self, url):
 
1736
        _scheme, _username, _password, _host, _port, _path = \
 
1737
            transport.split_url(url)
 
1738
        if _port is None:
 
1739
            _port = BZR_DEFAULT_PORT
 
1740
        else:
 
1741
            try:
 
1742
                _port = int(_port)
 
1743
            except (ValueError, TypeError), e:
 
1744
                raise errors.InvalidURL(
 
1745
                    path=url, extra="invalid port %s" % _port)
 
1746
        medium = SmartTCPClientMedium(_host, _port)
 
1747
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1748
 
 
1749
 
 
1750
class SmartSSHTransport(SmartTransport):
 
1751
    """Connection to smart server over SSH.
 
1752
 
 
1753
    This is essentially just a factory to get 'RemoteTransport(url,
 
1754
        SmartSSHClientMedium).
 
1755
    """
 
1756
 
 
1757
    def __init__(self, url):
 
1758
        _scheme, _username, _password, _host, _port, _path = \
 
1759
            transport.split_url(url)
 
1760
        try:
 
1761
            if _port is not None:
 
1762
                _port = int(_port)
 
1763
        except (ValueError, TypeError), e:
 
1764
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1765
                _port)
 
1766
        medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1767
        super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1768
 
 
1769
 
 
1770
class SmartHTTPTransport(SmartTransport):
 
1771
    """Just a way to connect between a bzr+http:// url and http://.
 
1772
    
 
1773
    This connection operates slightly differently than the SmartSSHTransport.
 
1774
    It uses a plain http:// transport underneath, which defines what remote
 
1775
    .bzr/smart URL we are connected to. From there, all paths that are sent are
 
1776
    sent as relative paths, this way, the remote side can properly
 
1777
    de-reference them, since it is likely doing rewrite rules to translate an
 
1778
    HTTP path into a local path.
 
1779
    """
 
1780
 
 
1781
    def __init__(self, url, http_transport=None):
 
1782
        assert url.startswith('bzr+http://')
 
1783
 
 
1784
        if http_transport is None:
 
1785
            http_url = url[len('bzr+'):]
 
1786
            self._http_transport = transport.get_transport(http_url)
 
1787
        else:
 
1788
            self._http_transport = http_transport
 
1789
        http_medium = self._http_transport.get_smart_medium()
 
1790
        super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
 
1791
 
 
1792
    def _remote_path(self, relpath):
 
1793
        """After connecting HTTP Transport only deals in relative URLs."""
 
1794
        if relpath == '.':
 
1795
            return ''
 
1796
        else:
 
1797
            return relpath
 
1798
 
 
1799
    def abspath(self, relpath):
 
1800
        """Return the full url to the given relative path.
 
1801
        
 
1802
        :param relpath: the relative path or path components
 
1803
        :type relpath: str or list
 
1804
        """
 
1805
        return self._unparse_url(self._combine_paths(self._path, relpath))
 
1806
 
 
1807
    def clone(self, relative_url):
 
1808
        """Make a new SmartHTTPTransport related to me.
 
1809
 
 
1810
        This is re-implemented rather than using the default
 
1811
        SmartTransport.clone() because we must be careful about the underlying
 
1812
        http transport.
 
1813
        """
 
1814
        if relative_url:
 
1815
            abs_url = self.abspath(relative_url)
 
1816
        else:
 
1817
            abs_url = self.base
 
1818
        # By cloning the underlying http_transport, we are able to share the
 
1819
        # connection.
 
1820
        new_transport = self._http_transport.clone(relative_url)
 
1821
        return SmartHTTPTransport(abs_url, http_transport=new_transport)
 
1822
 
 
1823
 
 
1824
def get_test_permutations():
 
1825
    """Return (transport, server) permutations for testing."""
 
1826
    ### We may need a little more test framework support to construct an
 
1827
    ### appropriate RemoteTransport in the future.
 
1828
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]