/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: Martin Pool
  • Date: 2007-04-01 06:19:16 UTC
  • mfrom: (2323.5.20 0.15-integration)
  • mto: This revision was merged to the branch mainline in revision 2390.
  • Revision ID: mbp@sourcefrog.net-20070401061916-plpgsxdf8g7gll9o
Merge 0.15 final release back to trunk, including: recommend upgrades of old workingtrees, handle multiple http redirections, some dirstate fixes, 

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
 
51
that you have multiple requests and get a read error because the other side did
 
52
shutdown.  For pipes we have read pipe which will have a zero read which marks
 
53
end-of-file.  For HTTP server environment there is not end-of-stream because
 
54
each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out requests from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialization, deserialization)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialization, deserialization)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
109
"""
 
110
 
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
 
196
 
17
197
from cStringIO import StringIO
 
198
import os
 
199
import socket
 
200
import sys
 
201
import tempfile
 
202
import threading
18
203
import urllib
19
204
import urlparse
20
205
 
21
206
from bzrlib import (
 
207
    bzrdir,
22
208
    errors,
 
209
    revision,
23
210
    transport,
 
211
    trace,
 
212
    urlutils,
24
213
    )
25
 
from bzrlib.smart.protocol import SmartClientRequestProtocolOne
26
 
from bzrlib.smart.medium import SmartTCPClientMedium, SmartSSHClientMedium
 
214
from bzrlib.bundle.serializer import write_bundle
 
215
try:
 
216
    from bzrlib.transport import ssh
 
217
except errors.ParamikoNotPresent:
 
218
    # no paramiko.  SmartSSHClientMedium will break.
 
219
    pass
27
220
 
28
221
# must do this otherwise urllib can't parse the urls properly :(
29
222
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
35
228
BZR_DEFAULT_PORT = 4155
36
229
 
37
230
 
 
231
def _recv_tuple(from_file):
 
232
    req_line = from_file.readline()
 
233
    return _decode_tuple(req_line)
 
234
 
 
235
 
 
236
def _decode_tuple(req_line):
 
237
    if req_line == None or req_line == '':
 
238
        return None
 
239
    if req_line[-1] != '\n':
 
240
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
241
    return tuple(req_line[:-1].split('\x01'))
 
242
 
 
243
 
 
244
def _encode_tuple(args):
 
245
    """Encode the tuple args to a bytestream."""
 
246
    return '\x01'.join(args) + '\n'
 
247
 
 
248
 
 
249
class SmartProtocolBase(object):
 
250
    """Methods common to client and server"""
 
251
 
 
252
    # TODO: this only actually accomodates a single block; possibly should
 
253
    # support multiple chunks?
 
254
    def _encode_bulk_data(self, body):
 
255
        """Encode body as a bulk data chunk."""
 
256
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
257
 
 
258
    def _serialise_offsets(self, offsets):
 
259
        """Serialise a readv offset list."""
 
260
        txt = []
 
261
        for start, length in offsets:
 
262
            txt.append('%d,%d' % (start, length))
 
263
        return '\n'.join(txt)
 
264
        
 
265
 
 
266
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
267
    """Server-side encoding and decoding logic for smart version 1."""
 
268
    
 
269
    def __init__(self, backing_transport, write_func):
 
270
        self._backing_transport = backing_transport
 
271
        self.excess_buffer = ''
 
272
        self._finished = False
 
273
        self.in_buffer = ''
 
274
        self.has_dispatched = False
 
275
        self.request = None
 
276
        self._body_decoder = None
 
277
        self._write_func = write_func
 
278
 
 
279
    def accept_bytes(self, bytes):
 
280
        """Take bytes, and advance the internal state machine appropriately.
 
281
        
 
282
        :param bytes: must be a byte string
 
283
        """
 
284
        assert isinstance(bytes, str)
 
285
        self.in_buffer += bytes
 
286
        if not self.has_dispatched:
 
287
            if '\n' not in self.in_buffer:
 
288
                # no command line yet
 
289
                return
 
290
            self.has_dispatched = True
 
291
            try:
 
292
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
293
                first_line += '\n'
 
294
                req_args = _decode_tuple(first_line)
 
295
                self.request = SmartServerRequestHandler(
 
296
                    self._backing_transport)
 
297
                self.request.dispatch_command(req_args[0], req_args[1:])
 
298
                if self.request.finished_reading:
 
299
                    # trivial request
 
300
                    self.excess_buffer = self.in_buffer
 
301
                    self.in_buffer = ''
 
302
                    self._send_response(self.request.response.args,
 
303
                        self.request.response.body)
 
304
            except KeyboardInterrupt:
 
305
                raise
 
306
            except Exception, exception:
 
307
                # everything else: pass to client, flush, and quit
 
308
                self._send_response(('error', str(exception)))
 
309
                return
 
310
 
 
311
        if self.has_dispatched:
 
312
            if self._finished:
 
313
                # nothing to do.XXX: this routine should be a single state 
 
314
                # machine too.
 
315
                self.excess_buffer += self.in_buffer
 
316
                self.in_buffer = ''
 
317
                return
 
318
            if self._body_decoder is None:
 
319
                self._body_decoder = LengthPrefixedBodyDecoder()
 
320
            self._body_decoder.accept_bytes(self.in_buffer)
 
321
            self.in_buffer = self._body_decoder.unused_data
 
322
            body_data = self._body_decoder.read_pending_data()
 
323
            self.request.accept_body(body_data)
 
324
            if self._body_decoder.finished_reading:
 
325
                self.request.end_of_body()
 
326
                assert self.request.finished_reading, \
 
327
                    "no more body, request not finished"
 
328
            if self.request.response is not None:
 
329
                self._send_response(self.request.response.args,
 
330
                    self.request.response.body)
 
331
                self.excess_buffer = self.in_buffer
 
332
                self.in_buffer = ''
 
333
            else:
 
334
                assert not self.request.finished_reading, \
 
335
                    "no response and we have finished reading."
 
336
 
 
337
    def _send_response(self, args, body=None):
 
338
        """Send a smart server response down the output stream."""
 
339
        assert not self._finished, 'response already sent'
 
340
        self._finished = True
 
341
        self._write_func(_encode_tuple(args))
 
342
        if body is not None:
 
343
            assert isinstance(body, str), 'body must be a str'
 
344
            bytes = self._encode_bulk_data(body)
 
345
            self._write_func(bytes)
 
346
 
 
347
    def next_read_size(self):
 
348
        if self._finished:
 
349
            return 0
 
350
        if self._body_decoder is None:
 
351
            return 1
 
352
        else:
 
353
            return self._body_decoder.next_read_size()
 
354
 
 
355
 
 
356
class LengthPrefixedBodyDecoder(object):
 
357
    """Decodes the length-prefixed bulk data."""
 
358
    
 
359
    def __init__(self):
 
360
        self.bytes_left = None
 
361
        self.finished_reading = False
 
362
        self.unused_data = ''
 
363
        self.state_accept = self._state_accept_expecting_length
 
364
        self.state_read = self._state_read_no_data
 
365
        self._in_buffer = ''
 
366
        self._trailer_buffer = ''
 
367
    
 
368
    def accept_bytes(self, bytes):
 
369
        """Decode as much of bytes as possible.
 
370
 
 
371
        If 'bytes' contains too much data it will be appended to
 
372
        self.unused_data.
 
373
 
 
374
        finished_reading will be set when no more data is required.  Further
 
375
        data will be appended to self.unused_data.
 
376
        """
 
377
        # accept_bytes is allowed to change the state
 
378
        current_state = self.state_accept
 
379
        self.state_accept(bytes)
 
380
        while current_state != self.state_accept:
 
381
            current_state = self.state_accept
 
382
            self.state_accept('')
 
383
 
 
384
    def next_read_size(self):
 
385
        if self.bytes_left is not None:
 
386
            # Ideally we want to read all the remainder of the body and the
 
387
            # trailer in one go.
 
388
            return self.bytes_left + 5
 
389
        elif self.state_accept == self._state_accept_reading_trailer:
 
390
            # Just the trailer left
 
391
            return 5 - len(self._trailer_buffer)
 
392
        elif self.state_accept == self._state_accept_expecting_length:
 
393
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
394
            # 'done\n').
 
395
            return 6
 
396
        else:
 
397
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
398
            return 1
 
399
        
 
400
    def read_pending_data(self):
 
401
        """Return any pending data that has been decoded."""
 
402
        return self.state_read()
 
403
 
 
404
    def _state_accept_expecting_length(self, bytes):
 
405
        self._in_buffer += bytes
 
406
        pos = self._in_buffer.find('\n')
 
407
        if pos == -1:
 
408
            return
 
409
        self.bytes_left = int(self._in_buffer[:pos])
 
410
        self._in_buffer = self._in_buffer[pos+1:]
 
411
        self.bytes_left -= len(self._in_buffer)
 
412
        self.state_accept = self._state_accept_reading_body
 
413
        self.state_read = self._state_read_in_buffer
 
414
 
 
415
    def _state_accept_reading_body(self, bytes):
 
416
        self._in_buffer += bytes
 
417
        self.bytes_left -= len(bytes)
 
418
        if self.bytes_left <= 0:
 
419
            # Finished with body
 
420
            if self.bytes_left != 0:
 
421
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
422
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
423
            self.bytes_left = None
 
424
            self.state_accept = self._state_accept_reading_trailer
 
425
        
 
426
    def _state_accept_reading_trailer(self, bytes):
 
427
        self._trailer_buffer += bytes
 
428
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
429
        # a ProtocolViolation exception?
 
430
        if self._trailer_buffer.startswith('done\n'):
 
431
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
432
            self.state_accept = self._state_accept_reading_unused
 
433
            self.finished_reading = True
 
434
    
 
435
    def _state_accept_reading_unused(self, bytes):
 
436
        self.unused_data += bytes
 
437
 
 
438
    def _state_read_no_data(self):
 
439
        return ''
 
440
 
 
441
    def _state_read_in_buffer(self):
 
442
        result = self._in_buffer
 
443
        self._in_buffer = ''
 
444
        return result
 
445
 
 
446
 
 
447
class SmartServerStreamMedium(object):
 
448
    """Handles smart commands coming over a stream.
 
449
 
 
450
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
451
    in-process fifo for testing.
 
452
 
 
453
    One instance is created for each connected client; it can serve multiple
 
454
    requests in the lifetime of the connection.
 
455
 
 
456
    The server passes requests through to an underlying backing transport, 
 
457
    which will typically be a LocalTransport looking at the server's filesystem.
 
458
    """
 
459
 
 
460
    def __init__(self, backing_transport):
 
461
        """Construct new server.
 
462
 
 
463
        :param backing_transport: Transport for the directory served.
 
464
        """
 
465
        # backing_transport could be passed to serve instead of __init__
 
466
        self.backing_transport = backing_transport
 
467
        self.finished = False
 
468
 
 
469
    def serve(self):
 
470
        """Serve requests until the client disconnects."""
 
471
        # Keep a reference to stderr because the sys module's globals get set to
 
472
        # None during interpreter shutdown.
 
473
        from sys import stderr
 
474
        try:
 
475
            while not self.finished:
 
476
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
477
                                                         self._write_out)
 
478
                self._serve_one_request(protocol)
 
479
        except Exception, e:
 
480
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
481
            raise
 
482
 
 
483
    def _serve_one_request(self, protocol):
 
484
        """Read one request from input, process, send back a response.
 
485
        
 
486
        :param protocol: a SmartServerRequestProtocol.
 
487
        """
 
488
        try:
 
489
            self._serve_one_request_unguarded(protocol)
 
490
        except KeyboardInterrupt:
 
491
            raise
 
492
        except Exception, e:
 
493
            self.terminate_due_to_error()
 
494
 
 
495
    def terminate_due_to_error(self):
 
496
        """Called when an unhandled exception from the protocol occurs."""
 
497
        raise NotImplementedError(self.terminate_due_to_error)
 
498
 
 
499
 
 
500
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
501
 
 
502
    def __init__(self, sock, backing_transport):
 
503
        """Constructor.
 
504
 
 
505
        :param sock: the socket the server will read from.  It will be put
 
506
            into blocking mode.
 
507
        """
 
508
        SmartServerStreamMedium.__init__(self, backing_transport)
 
509
        self.push_back = ''
 
510
        sock.setblocking(True)
 
511
        self.socket = sock
 
512
 
 
513
    def _serve_one_request_unguarded(self, protocol):
 
514
        while protocol.next_read_size():
 
515
            if self.push_back:
 
516
                protocol.accept_bytes(self.push_back)
 
517
                self.push_back = ''
 
518
            else:
 
519
                bytes = self.socket.recv(4096)
 
520
                if bytes == '':
 
521
                    self.finished = True
 
522
                    return
 
523
                protocol.accept_bytes(bytes)
 
524
        
 
525
        self.push_back = protocol.excess_buffer
 
526
    
 
527
    def terminate_due_to_error(self):
 
528
        """Called when an unhandled exception from the protocol occurs."""
 
529
        # TODO: This should log to a server log file, but no such thing
 
530
        # exists yet.  Andrew Bennetts 2006-09-29.
 
531
        self.socket.close()
 
532
        self.finished = True
 
533
 
 
534
    def _write_out(self, bytes):
 
535
        self.socket.sendall(bytes)
 
536
 
 
537
 
 
538
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
539
 
 
540
    def __init__(self, in_file, out_file, backing_transport):
 
541
        """Construct new server.
 
542
 
 
543
        :param in_file: Python file from which requests can be read.
 
544
        :param out_file: Python file to write responses.
 
545
        :param backing_transport: Transport for the directory served.
 
546
        """
 
547
        SmartServerStreamMedium.__init__(self, backing_transport)
 
548
        if sys.platform == 'win32':
 
549
            # force binary mode for files
 
550
            import msvcrt
 
551
            for f in (in_file, out_file):
 
552
                fileno = getattr(f, 'fileno', None)
 
553
                if fileno:
 
554
                    msvcrt.setmode(fileno(), os.O_BINARY)
 
555
        self._in = in_file
 
556
        self._out = out_file
 
557
 
 
558
    def _serve_one_request_unguarded(self, protocol):
 
559
        while True:
 
560
            bytes_to_read = protocol.next_read_size()
 
561
            if bytes_to_read == 0:
 
562
                # Finished serving this request.
 
563
                self._out.flush()
 
564
                return
 
565
            bytes = self._in.read(bytes_to_read)
 
566
            if bytes == '':
 
567
                # Connection has been closed.
 
568
                self.finished = True
 
569
                self._out.flush()
 
570
                return
 
571
            protocol.accept_bytes(bytes)
 
572
 
 
573
    def terminate_due_to_error(self):
 
574
        # TODO: This should log to a server log file, but no such thing
 
575
        # exists yet.  Andrew Bennetts 2006-09-29.
 
576
        self._out.close()
 
577
        self.finished = True
 
578
 
 
579
    def _write_out(self, bytes):
 
580
        self._out.write(bytes)
 
581
 
 
582
 
 
583
class SmartServerResponse(object):
 
584
    """Response generated by SmartServerRequestHandler."""
 
585
 
 
586
    def __init__(self, args, body=None):
 
587
        self.args = args
 
588
        self.body = body
 
589
 
 
590
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
591
# for delivering the data for a request. This could be done with as the
 
592
# StreamServer, though that would create conflation between request and response
 
593
# which may be undesirable.
 
594
 
 
595
 
 
596
class SmartServerRequestHandler(object):
 
597
    """Protocol logic for smart server.
 
598
    
 
599
    This doesn't handle serialization at all, it just processes requests and
 
600
    creates responses.
 
601
    """
 
602
 
 
603
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
604
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
605
    # from the object protocol: we will want to tweak the wire protocol separate
 
606
    # from the object model, and ideally we will be able to do that without
 
607
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
608
    # just a Protocol subclass.
 
609
 
 
610
    # TODO: Better way of representing the body for commands that take it,
 
611
    # and allow it to be streamed into the server.
 
612
    
 
613
    def __init__(self, backing_transport):
 
614
        self._backing_transport = backing_transport
 
615
        self._converted_command = False
 
616
        self.finished_reading = False
 
617
        self._body_bytes = ''
 
618
        self.response = None
 
619
 
 
620
    def accept_body(self, bytes):
 
621
        """Accept body data.
 
622
 
 
623
        This should be overriden for each command that desired body data to
 
624
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
625
 
 
626
        The deserialisation into that format should be done in the Protocol
 
627
        object. Set self.desired_body_format to the format your method will
 
628
        handle.
 
629
        """
 
630
        # default fallback is to accumulate bytes.
 
631
        self._body_bytes += bytes
 
632
        
 
633
    def _end_of_body_handler(self):
 
634
        """An unimplemented end of body handler."""
 
635
        raise NotImplementedError(self._end_of_body_handler)
 
636
        
 
637
    def do_hello(self):
 
638
        """Answer a version request with my version."""
 
639
        return SmartServerResponse(('ok', '1'))
 
640
 
 
641
    def do_has(self, relpath):
 
642
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
643
        return SmartServerResponse((r,))
 
644
 
 
645
    def do_get(self, relpath):
 
646
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
647
        return SmartServerResponse(('ok',), backing_bytes)
 
648
 
 
649
    def _deserialise_optional_mode(self, mode):
 
650
        # XXX: FIXME this should be on the protocol object.
 
651
        if mode == '':
 
652
            return None
 
653
        else:
 
654
            return int(mode)
 
655
 
 
656
    def do_append(self, relpath, mode):
 
657
        self._converted_command = True
 
658
        self._relpath = relpath
 
659
        self._mode = self._deserialise_optional_mode(mode)
 
660
        self._end_of_body_handler = self._handle_do_append_end
 
661
    
 
662
    def _handle_do_append_end(self):
 
663
        old_length = self._backing_transport.append_bytes(
 
664
            self._relpath, self._body_bytes, self._mode)
 
665
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
666
 
 
667
    def do_delete(self, relpath):
 
668
        self._backing_transport.delete(relpath)
 
669
 
 
670
    def do_iter_files_recursive(self, relpath):
 
671
        transport = self._backing_transport.clone(relpath)
 
672
        filenames = transport.iter_files_recursive()
 
673
        return SmartServerResponse(('names',) + tuple(filenames))
 
674
 
 
675
    def do_list_dir(self, relpath):
 
676
        filenames = self._backing_transport.list_dir(relpath)
 
677
        return SmartServerResponse(('names',) + tuple(filenames))
 
678
 
 
679
    def do_mkdir(self, relpath, mode):
 
680
        self._backing_transport.mkdir(relpath,
 
681
                                      self._deserialise_optional_mode(mode))
 
682
 
 
683
    def do_move(self, rel_from, rel_to):
 
684
        self._backing_transport.move(rel_from, rel_to)
 
685
 
 
686
    def do_put(self, relpath, mode):
 
687
        self._converted_command = True
 
688
        self._relpath = relpath
 
689
        self._mode = self._deserialise_optional_mode(mode)
 
690
        self._end_of_body_handler = self._handle_do_put
 
691
 
 
692
    def _handle_do_put(self):
 
693
        self._backing_transport.put_bytes(self._relpath,
 
694
                self._body_bytes, self._mode)
 
695
        self.response = SmartServerResponse(('ok',))
 
696
 
 
697
    def _deserialise_offsets(self, text):
 
698
        # XXX: FIXME this should be on the protocol object.
 
699
        offsets = []
 
700
        for line in text.split('\n'):
 
701
            if not line:
 
702
                continue
 
703
            start, length = line.split(',')
 
704
            offsets.append((int(start), int(length)))
 
705
        return offsets
 
706
 
 
707
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
708
        self._converted_command = True
 
709
        self._end_of_body_handler = self._handle_put_non_atomic
 
710
        self._relpath = relpath
 
711
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
712
        self._mode = self._deserialise_optional_mode(mode)
 
713
        # a boolean would be nicer XXX
 
714
        self._create_parent = (create_parent == 'T')
 
715
 
 
716
    def _handle_put_non_atomic(self):
 
717
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
718
                self._body_bytes,
 
719
                mode=self._mode,
 
720
                create_parent_dir=self._create_parent,
 
721
                dir_mode=self._dir_mode)
 
722
        self.response = SmartServerResponse(('ok',))
 
723
 
 
724
    def do_readv(self, relpath):
 
725
        self._converted_command = True
 
726
        self._end_of_body_handler = self._handle_readv_offsets
 
727
        self._relpath = relpath
 
728
 
 
729
    def end_of_body(self):
 
730
        """No more body data will be received."""
 
731
        self._run_handler_code(self._end_of_body_handler, (), {})
 
732
        # cannot read after this.
 
733
        self.finished_reading = True
 
734
 
 
735
    def _handle_readv_offsets(self):
 
736
        """accept offsets for a readv request."""
 
737
        offsets = self._deserialise_offsets(self._body_bytes)
 
738
        backing_bytes = ''.join(bytes for offset, bytes in
 
739
            self._backing_transport.readv(self._relpath, offsets))
 
740
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
741
        
 
742
    def do_rename(self, rel_from, rel_to):
 
743
        self._backing_transport.rename(rel_from, rel_to)
 
744
 
 
745
    def do_rmdir(self, relpath):
 
746
        self._backing_transport.rmdir(relpath)
 
747
 
 
748
    def do_stat(self, relpath):
 
749
        stat = self._backing_transport.stat(relpath)
 
750
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
751
        
 
752
    def do_get_bundle(self, path, revision_id):
 
753
        # open transport relative to our base
 
754
        t = self._backing_transport.clone(path)
 
755
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
756
        repo = control.open_repository()
 
757
        tmpf = tempfile.TemporaryFile()
 
758
        base_revision = revision.NULL_REVISION
 
759
        write_bundle(repo, revision_id, base_revision, tmpf)
 
760
        tmpf.seek(0)
 
761
        return SmartServerResponse((), tmpf.read())
 
762
 
 
763
    def dispatch_command(self, cmd, args):
 
764
        """Deprecated compatibility method.""" # XXX XXX
 
765
        func = getattr(self, 'do_' + cmd, None)
 
766
        if func is None:
 
767
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
768
        self._run_handler_code(func, args, {})
 
769
 
 
770
    def _run_handler_code(self, callable, args, kwargs):
 
771
        """Run some handler specific code 'callable'.
 
772
 
 
773
        If a result is returned, it is considered to be the commands response,
 
774
        and finished_reading is set true, and its assigned to self.response.
 
775
 
 
776
        Any exceptions caught are translated and a response object created
 
777
        from them.
 
778
        """
 
779
        result = self._call_converting_errors(callable, args, kwargs)
 
780
        if result is not None:
 
781
            self.response = result
 
782
            self.finished_reading = True
 
783
        # handle unconverted commands
 
784
        if not self._converted_command:
 
785
            self.finished_reading = True
 
786
            if result is None:
 
787
                self.response = SmartServerResponse(('ok',))
 
788
 
 
789
    def _call_converting_errors(self, callable, args, kwargs):
 
790
        """Call callable converting errors to Response objects."""
 
791
        try:
 
792
            return callable(*args, **kwargs)
 
793
        except errors.NoSuchFile, e:
 
794
            return SmartServerResponse(('NoSuchFile', e.path))
 
795
        except errors.FileExists, e:
 
796
            return SmartServerResponse(('FileExists', e.path))
 
797
        except errors.DirectoryNotEmpty, e:
 
798
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
799
        except errors.ShortReadvError, e:
 
800
            return SmartServerResponse(('ShortReadvError',
 
801
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
802
        except UnicodeError, e:
 
803
            # If it is a DecodeError, than most likely we are starting
 
804
            # with a plain string
 
805
            str_or_unicode = e.object
 
806
            if isinstance(str_or_unicode, unicode):
 
807
                # XXX: UTF-8 might have \x01 (our seperator byte) in it.  We
 
808
                # should escape it somehow.
 
809
                val = 'u:' + str_or_unicode.encode('utf-8')
 
810
            else:
 
811
                val = 's:' + str_or_unicode.encode('base64')
 
812
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
813
            return SmartServerResponse((e.__class__.__name__,
 
814
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
815
        except errors.TransportNotPossible, e:
 
816
            if e.msg == "readonly transport":
 
817
                return SmartServerResponse(('ReadOnlyError', ))
 
818
            else:
 
819
                raise
 
820
 
 
821
 
 
822
class SmartTCPServer(object):
 
823
    """Listens on a TCP socket and accepts connections from smart clients"""
 
824
 
 
825
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
826
        """Construct a new server.
 
827
 
 
828
        To actually start it running, call either start_background_thread or
 
829
        serve.
 
830
 
 
831
        :param host: Name of the interface to listen on.
 
832
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
833
        """
 
834
        self._server_socket = socket.socket()
 
835
        self._server_socket.bind((host, port))
 
836
        self.port = self._server_socket.getsockname()[1]
 
837
        self._server_socket.listen(1)
 
838
        self._server_socket.settimeout(1)
 
839
        self.backing_transport = backing_transport
 
840
 
 
841
    def serve(self):
 
842
        # let connections timeout so that we get a chance to terminate
 
843
        # Keep a reference to the exceptions we want to catch because the socket
 
844
        # module's globals get set to None during interpreter shutdown.
 
845
        from socket import timeout as socket_timeout
 
846
        from socket import error as socket_error
 
847
        self._should_terminate = False
 
848
        while not self._should_terminate:
 
849
            try:
 
850
                self.accept_and_serve()
 
851
            except socket_timeout:
 
852
                # just check if we're asked to stop
 
853
                pass
 
854
            except socket_error, e:
 
855
                trace.warning("client disconnected: %s", e)
 
856
                pass
 
857
 
 
858
    def get_url(self):
 
859
        """Return the url of the server"""
 
860
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
861
 
 
862
    def accept_and_serve(self):
 
863
        conn, client_addr = self._server_socket.accept()
 
864
        # For WIN32, where the timeout value from the listening socket
 
865
        # propogates to the newly accepted socket.
 
866
        conn.setblocking(True)
 
867
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
868
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
 
869
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
870
        connection_thread.setDaemon(True)
 
871
        connection_thread.start()
 
872
 
 
873
    def start_background_thread(self):
 
874
        self._server_thread = threading.Thread(None,
 
875
                self.serve,
 
876
                name='server-' + self.get_url())
 
877
        self._server_thread.setDaemon(True)
 
878
        self._server_thread.start()
 
879
 
 
880
    def stop_background_thread(self):
 
881
        self._should_terminate = True
 
882
        # At one point we would wait to join the threads here, but it looks
 
883
        # like they don't actually exit.  So now we just leave them running
 
884
        # and expect to terminate the process. -- mbp 20070215
 
885
        # self._server_socket.close()
 
886
        ## sys.stderr.write("waiting for server thread to finish...")
 
887
        ## self._server_thread.join()
 
888
 
 
889
 
 
890
class SmartTCPServer_for_testing(SmartTCPServer):
 
891
    """Server suitable for use by transport tests.
 
892
    
 
893
    This server is backed by the process's cwd.
 
894
    """
 
895
 
 
896
    def __init__(self):
 
897
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
 
898
        # The server is set up by default like for ssh access: the client
 
899
        # passes filesystem-absolute paths; therefore the server must look
 
900
        # them up relative to the root directory.  it might be better to act
 
901
        # a public server and have the server rewrite paths into the test
 
902
        # directory.
 
903
        SmartTCPServer.__init__(self,
 
904
            transport.get_transport(urlutils.local_path_to_url('/')))
 
905
        
 
906
    def get_backing_transport(self, backing_transport_server):
 
907
        """Get a backing transport from a server we are decorating."""
 
908
        return transport.get_transport(backing_transport_server.get_url())
 
909
 
 
910
    def setUp(self, backing_transport_server=None):
 
911
        """Set up server for testing"""
 
912
        from bzrlib.transport.chroot import TestingChrootServer
 
913
        if backing_transport_server is None:
 
914
            from bzrlib.transport.local import LocalURLServer
 
915
            backing_transport_server = LocalURLServer()
 
916
        self.chroot_server = TestingChrootServer()
 
917
        self.chroot_server.setUp(backing_transport_server)
 
918
        self.backing_transport = transport.get_transport(
 
919
            self.chroot_server.get_url())
 
920
        self.start_background_thread()
 
921
 
 
922
    def tearDown(self):
 
923
        self.stop_background_thread()
 
924
 
 
925
    def get_bogus_url(self):
 
926
        """Return a URL which will fail to connect"""
 
927
        return 'bzr://127.0.0.1:1/'
 
928
 
 
929
 
38
930
class SmartStat(object):
39
931
 
40
932
    def __init__(self, size, mode):
42
934
        self.st_mode = mode
43
935
 
44
936
 
45
 
class RemoteTransport(transport.Transport):
 
937
class SmartTransport(transport.Transport):
46
938
    """Connection to a smart server.
47
939
 
48
940
    The connection holds references to pipes that can be used to send requests
56
948
 
57
949
    The connection can be made over a tcp socket, or (in future) an ssh pipe
58
950
    or a series of http requests.  There are concrete subclasses for each
59
 
    type: RemoteTCPTransport, etc.
 
951
    type: SmartTCPTransport, etc.
60
952
    """
61
953
 
62
 
    # IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
 
954
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
63
955
    # responsibilities: Put those on SmartClient or similar. This is vital for
64
956
    # the ability to support multiple versions of the smart protocol over time:
65
 
    # RemoteTransport is an adapter from the Transport object model to the 
 
957
    # SmartTransport is an adapter from the Transport object model to the 
66
958
    # SmartClient model, not an encoder.
67
959
 
68
960
    def __init__(self, url, clone_from=None, medium=None):
76
968
        ### initialisation order things would blow up. 
77
969
        if not url.endswith('/'):
78
970
            url += '/'
79
 
        super(RemoteTransport, self).__init__(url)
 
971
        super(SmartTransport, self).__init__(url)
80
972
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
81
973
                transport.split_url(url)
82
974
        if clone_from is None:
98
990
        return self._unparse_url(self._remote_path(relpath))
99
991
    
100
992
    def clone(self, relative_url):
101
 
        """Make a new RemoteTransport related to me, sharing the same connection.
 
993
        """Make a new SmartTransport related to me, sharing the same connection.
102
994
 
103
995
        This essentially opens a handle on a different remote directory.
104
996
        """
105
997
        if relative_url is None:
106
 
            return RemoteTransport(self.base, self)
 
998
            return SmartTransport(self.base, self)
107
999
        else:
108
 
            return RemoteTransport(self.abspath(relative_url), self)
 
1000
            return SmartTransport(self.abspath(relative_url), self)
109
1001
 
110
1002
    def is_readonly(self):
111
1003
        """Smart server transport can do read/write file operations."""
398
1290
            self._translate_error(resp)
399
1291
 
400
1292
 
401
 
 
402
 
class RemoteTCPTransport(RemoteTransport):
 
1293
class SmartClientMediumRequest(object):
 
1294
    """A request on a SmartClientMedium.
 
1295
 
 
1296
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1297
    the response bytes to be read via read_bytes.
 
1298
 
 
1299
    For instance:
 
1300
    request.accept_bytes('123')
 
1301
    request.finished_writing()
 
1302
    result = request.read_bytes(3)
 
1303
    request.finished_reading()
 
1304
 
 
1305
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1306
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1307
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1308
    details on concurrency and pipelining.
 
1309
    """
 
1310
 
 
1311
    def __init__(self, medium):
 
1312
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1313
        self._medium = medium
 
1314
        # we track state by constants - we may want to use the same
 
1315
        # pattern as BodyReader if it gets more complex.
 
1316
        # valid states are: "writing", "reading", "done"
 
1317
        self._state = "writing"
 
1318
 
 
1319
    def accept_bytes(self, bytes):
 
1320
        """Accept bytes for inclusion in this request.
 
1321
 
 
1322
        This method may not be be called after finished_writing() has been
 
1323
        called.  It depends upon the Medium whether or not the bytes will be
 
1324
        immediately transmitted. Message based Mediums will tend to buffer the
 
1325
        bytes until finished_writing() is called.
 
1326
 
 
1327
        :param bytes: A bytestring.
 
1328
        """
 
1329
        if self._state != "writing":
 
1330
            raise errors.WritingCompleted(self)
 
1331
        self._accept_bytes(bytes)
 
1332
 
 
1333
    def _accept_bytes(self, bytes):
 
1334
        """Helper for accept_bytes.
 
1335
 
 
1336
        Accept_bytes checks the state of the request to determing if bytes
 
1337
        should be accepted. After that it hands off to _accept_bytes to do the
 
1338
        actual acceptance.
 
1339
        """
 
1340
        raise NotImplementedError(self._accept_bytes)
 
1341
 
 
1342
    def finished_reading(self):
 
1343
        """Inform the request that all desired data has been read.
 
1344
 
 
1345
        This will remove the request from the pipeline for its medium (if the
 
1346
        medium supports pipelining) and any further calls to methods on the
 
1347
        request will raise ReadingCompleted.
 
1348
        """
 
1349
        if self._state == "writing":
 
1350
            raise errors.WritingNotComplete(self)
 
1351
        if self._state != "reading":
 
1352
            raise errors.ReadingCompleted(self)
 
1353
        self._state = "done"
 
1354
        self._finished_reading()
 
1355
 
 
1356
    def _finished_reading(self):
 
1357
        """Helper for finished_reading.
 
1358
 
 
1359
        finished_reading checks the state of the request to determine if 
 
1360
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1361
        to perform the action.
 
1362
        """
 
1363
        raise NotImplementedError(self._finished_reading)
 
1364
 
 
1365
    def finished_writing(self):
 
1366
        """Finish the writing phase of this request.
 
1367
 
 
1368
        This will flush all pending data for this request along the medium.
 
1369
        After calling finished_writing, you may not call accept_bytes anymore.
 
1370
        """
 
1371
        if self._state != "writing":
 
1372
            raise errors.WritingCompleted(self)
 
1373
        self._state = "reading"
 
1374
        self._finished_writing()
 
1375
 
 
1376
    def _finished_writing(self):
 
1377
        """Helper for finished_writing.
 
1378
 
 
1379
        finished_writing checks the state of the request to determine if 
 
1380
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1381
        to perform the action.
 
1382
        """
 
1383
        raise NotImplementedError(self._finished_writing)
 
1384
 
 
1385
    def read_bytes(self, count):
 
1386
        """Read bytes from this requests response.
 
1387
 
 
1388
        This method will block and wait for count bytes to be read. It may not
 
1389
        be invoked until finished_writing() has been called - this is to ensure
 
1390
        a message-based approach to requests, for compatability with message
 
1391
        based mediums like HTTP.
 
1392
        """
 
1393
        if self._state == "writing":
 
1394
            raise errors.WritingNotComplete(self)
 
1395
        if self._state != "reading":
 
1396
            raise errors.ReadingCompleted(self)
 
1397
        return self._read_bytes(count)
 
1398
 
 
1399
    def _read_bytes(self, count):
 
1400
        """Helper for read_bytes.
 
1401
 
 
1402
        read_bytes checks the state of the request to determing if bytes
 
1403
        should be read. After that it hands off to _read_bytes to do the
 
1404
        actual read.
 
1405
        """
 
1406
        raise NotImplementedError(self._read_bytes)
 
1407
 
 
1408
 
 
1409
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1410
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1411
 
 
1412
    def __init__(self, medium):
 
1413
        SmartClientMediumRequest.__init__(self, medium)
 
1414
        # check that we are safe concurrency wise. If some streams start
 
1415
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1416
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1417
        # and the setting/unsetting of _current_request likewise moved into
 
1418
        # that class : but its unneeded overhead for now. RBC 20060922
 
1419
        if self._medium._current_request is not None:
 
1420
            raise errors.TooManyConcurrentRequests(self._medium)
 
1421
        self._medium._current_request = self
 
1422
 
 
1423
    def _accept_bytes(self, bytes):
 
1424
        """See SmartClientMediumRequest._accept_bytes.
 
1425
        
 
1426
        This forwards to self._medium._accept_bytes because we are operating
 
1427
        on the mediums stream.
 
1428
        """
 
1429
        self._medium._accept_bytes(bytes)
 
1430
 
 
1431
    def _finished_reading(self):
 
1432
        """See SmartClientMediumRequest._finished_reading.
 
1433
 
 
1434
        This clears the _current_request on self._medium to allow a new 
 
1435
        request to be created.
 
1436
        """
 
1437
        assert self._medium._current_request is self
 
1438
        self._medium._current_request = None
 
1439
        
 
1440
    def _finished_writing(self):
 
1441
        """See SmartClientMediumRequest._finished_writing.
 
1442
 
 
1443
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1444
        """
 
1445
        self._medium._flush()
 
1446
 
 
1447
    def _read_bytes(self, count):
 
1448
        """See SmartClientMediumRequest._read_bytes.
 
1449
        
 
1450
        This forwards to self._medium._read_bytes because we are operating
 
1451
        on the mediums stream.
 
1452
        """
 
1453
        return self._medium._read_bytes(count)
 
1454
 
 
1455
 
 
1456
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1457
    """The client-side protocol for smart version 1."""
 
1458
 
 
1459
    def __init__(self, request):
 
1460
        """Construct a SmartClientRequestProtocolOne.
 
1461
 
 
1462
        :param request: A SmartClientMediumRequest to serialise onto and
 
1463
            deserialise from.
 
1464
        """
 
1465
        self._request = request
 
1466
        self._body_buffer = None
 
1467
 
 
1468
    def call(self, *args):
 
1469
        bytes = _encode_tuple(args)
 
1470
        self._request.accept_bytes(bytes)
 
1471
        self._request.finished_writing()
 
1472
 
 
1473
    def call_with_body_bytes(self, args, body):
 
1474
        """Make a remote call of args with body bytes 'body'.
 
1475
 
 
1476
        After calling this, call read_response_tuple to find the result out.
 
1477
        """
 
1478
        bytes = _encode_tuple(args)
 
1479
        self._request.accept_bytes(bytes)
 
1480
        bytes = self._encode_bulk_data(body)
 
1481
        self._request.accept_bytes(bytes)
 
1482
        self._request.finished_writing()
 
1483
 
 
1484
    def call_with_body_readv_array(self, args, body):
 
1485
        """Make a remote call with a readv array.
 
1486
 
 
1487
        The body is encoded with one line per readv offset pair. The numbers in
 
1488
        each pair are separated by a comma, and no trailing \n is emitted.
 
1489
        """
 
1490
        bytes = _encode_tuple(args)
 
1491
        self._request.accept_bytes(bytes)
 
1492
        readv_bytes = self._serialise_offsets(body)
 
1493
        bytes = self._encode_bulk_data(readv_bytes)
 
1494
        self._request.accept_bytes(bytes)
 
1495
        self._request.finished_writing()
 
1496
 
 
1497
    def cancel_read_body(self):
 
1498
        """After expecting a body, a response code may indicate one otherwise.
 
1499
 
 
1500
        This method lets the domain client inform the protocol that no body
 
1501
        will be transmitted. This is a terminal method: after calling it the
 
1502
        protocol is not able to be used further.
 
1503
        """
 
1504
        self._request.finished_reading()
 
1505
 
 
1506
    def read_response_tuple(self, expect_body=False):
 
1507
        """Read a response tuple from the wire.
 
1508
 
 
1509
        This should only be called once.
 
1510
        """
 
1511
        result = self._recv_tuple()
 
1512
        if not expect_body:
 
1513
            self._request.finished_reading()
 
1514
        return result
 
1515
 
 
1516
    def read_body_bytes(self, count=-1):
 
1517
        """Read bytes from the body, decoding into a byte stream.
 
1518
        
 
1519
        We read all bytes at once to ensure we've checked the trailer for 
 
1520
        errors, and then feed the buffer back as read_body_bytes is called.
 
1521
        """
 
1522
        if self._body_buffer is not None:
 
1523
            return self._body_buffer.read(count)
 
1524
        _body_decoder = LengthPrefixedBodyDecoder()
 
1525
 
 
1526
        while not _body_decoder.finished_reading:
 
1527
            bytes_wanted = _body_decoder.next_read_size()
 
1528
            bytes = self._request.read_bytes(bytes_wanted)
 
1529
            _body_decoder.accept_bytes(bytes)
 
1530
        self._request.finished_reading()
 
1531
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1532
        # XXX: TODO check the trailer result.
 
1533
        return self._body_buffer.read(count)
 
1534
 
 
1535
    def _recv_tuple(self):
 
1536
        """Receive a tuple from the medium request."""
 
1537
        line = ''
 
1538
        while not line or line[-1] != '\n':
 
1539
            # TODO: this is inefficient - but tuples are short.
 
1540
            new_char = self._request.read_bytes(1)
 
1541
            line += new_char
 
1542
            assert new_char != '', "end of file reading from server."
 
1543
        return _decode_tuple(line)
 
1544
 
 
1545
    def query_version(self):
 
1546
        """Return protocol version number of the server."""
 
1547
        self.call('hello')
 
1548
        resp = self.read_response_tuple()
 
1549
        if resp == ('ok', '1'):
 
1550
            return 1
 
1551
        else:
 
1552
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1553
 
 
1554
 
 
1555
class SmartClientMedium(object):
 
1556
    """Smart client is a medium for sending smart protocol requests over."""
 
1557
 
 
1558
    def disconnect(self):
 
1559
        """If this medium maintains a persistent connection, close it.
 
1560
        
 
1561
        The default implementation does nothing.
 
1562
        """
 
1563
        
 
1564
 
 
1565
class SmartClientStreamMedium(SmartClientMedium):
 
1566
    """Stream based medium common class.
 
1567
 
 
1568
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1569
    SmartClientStreamMediumRequest for their requests, and should implement
 
1570
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1571
    receive bytes.
 
1572
    """
 
1573
 
 
1574
    def __init__(self):
 
1575
        self._current_request = None
 
1576
 
 
1577
    def accept_bytes(self, bytes):
 
1578
        self._accept_bytes(bytes)
 
1579
 
 
1580
    def __del__(self):
 
1581
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1582
        finished with it.
 
1583
        """
 
1584
        self.disconnect()
 
1585
 
 
1586
    def _flush(self):
 
1587
        """Flush the output stream.
 
1588
        
 
1589
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1590
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1591
        """
 
1592
        raise NotImplementedError(self._flush)
 
1593
 
 
1594
    def get_request(self):
 
1595
        """See SmartClientMedium.get_request().
 
1596
 
 
1597
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1598
        for get_request.
 
1599
        """
 
1600
        return SmartClientStreamMediumRequest(self)
 
1601
 
 
1602
    def read_bytes(self, count):
 
1603
        return self._read_bytes(count)
 
1604
 
 
1605
 
 
1606
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1607
    """A client medium using simple pipes.
 
1608
    
 
1609
    This client does not manage the pipes: it assumes they will always be open.
 
1610
    """
 
1611
 
 
1612
    def __init__(self, readable_pipe, writeable_pipe):
 
1613
        SmartClientStreamMedium.__init__(self)
 
1614
        self._readable_pipe = readable_pipe
 
1615
        self._writeable_pipe = writeable_pipe
 
1616
 
 
1617
    def _accept_bytes(self, bytes):
 
1618
        """See SmartClientStreamMedium.accept_bytes."""
 
1619
        self._writeable_pipe.write(bytes)
 
1620
 
 
1621
    def _flush(self):
 
1622
        """See SmartClientStreamMedium._flush()."""
 
1623
        self._writeable_pipe.flush()
 
1624
 
 
1625
    def _read_bytes(self, count):
 
1626
        """See SmartClientStreamMedium._read_bytes."""
 
1627
        return self._readable_pipe.read(count)
 
1628
 
 
1629
 
 
1630
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1631
    """A client medium using SSH."""
 
1632
    
 
1633
    def __init__(self, host, port=None, username=None, password=None,
 
1634
            vendor=None):
 
1635
        """Creates a client that will connect on the first use.
 
1636
        
 
1637
        :param vendor: An optional override for the ssh vendor to use. See
 
1638
            bzrlib.transport.ssh for details on ssh vendors.
 
1639
        """
 
1640
        SmartClientStreamMedium.__init__(self)
 
1641
        self._connected = False
 
1642
        self._host = host
 
1643
        self._password = password
 
1644
        self._port = port
 
1645
        self._username = username
 
1646
        self._read_from = None
 
1647
        self._ssh_connection = None
 
1648
        self._vendor = vendor
 
1649
        self._write_to = None
 
1650
 
 
1651
    def _accept_bytes(self, bytes):
 
1652
        """See SmartClientStreamMedium.accept_bytes."""
 
1653
        self._ensure_connection()
 
1654
        self._write_to.write(bytes)
 
1655
 
 
1656
    def disconnect(self):
 
1657
        """See SmartClientMedium.disconnect()."""
 
1658
        if not self._connected:
 
1659
            return
 
1660
        self._read_from.close()
 
1661
        self._write_to.close()
 
1662
        self._ssh_connection.close()
 
1663
        self._connected = False
 
1664
 
 
1665
    def _ensure_connection(self):
 
1666
        """Connect this medium if not already connected."""
 
1667
        if self._connected:
 
1668
            return
 
1669
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1670
        if self._vendor is None:
 
1671
            vendor = ssh._get_ssh_vendor()
 
1672
        else:
 
1673
            vendor = self._vendor
 
1674
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1675
                self._password, self._host, self._port,
 
1676
                command=[executable, 'serve', '--inet', '--directory=/',
 
1677
                         '--allow-writes'])
 
1678
        self._read_from, self._write_to = \
 
1679
            self._ssh_connection.get_filelike_channels()
 
1680
        self._connected = True
 
1681
 
 
1682
    def _flush(self):
 
1683
        """See SmartClientStreamMedium._flush()."""
 
1684
        self._write_to.flush()
 
1685
 
 
1686
    def _read_bytes(self, count):
 
1687
        """See SmartClientStreamMedium.read_bytes."""
 
1688
        if not self._connected:
 
1689
            raise errors.MediumNotConnected(self)
 
1690
        return self._read_from.read(count)
 
1691
 
 
1692
 
 
1693
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1694
    """A client medium using TCP."""
 
1695
    
 
1696
    def __init__(self, host, port):
 
1697
        """Creates a client that will connect on the first use."""
 
1698
        SmartClientStreamMedium.__init__(self)
 
1699
        self._connected = False
 
1700
        self._host = host
 
1701
        self._port = port
 
1702
        self._socket = None
 
1703
 
 
1704
    def _accept_bytes(self, bytes):
 
1705
        """See SmartClientMedium.accept_bytes."""
 
1706
        self._ensure_connection()
 
1707
        self._socket.sendall(bytes)
 
1708
 
 
1709
    def disconnect(self):
 
1710
        """See SmartClientMedium.disconnect()."""
 
1711
        if not self._connected:
 
1712
            return
 
1713
        self._socket.close()
 
1714
        self._socket = None
 
1715
        self._connected = False
 
1716
 
 
1717
    def _ensure_connection(self):
 
1718
        """Connect this medium if not already connected."""
 
1719
        if self._connected:
 
1720
            return
 
1721
        self._socket = socket.socket()
 
1722
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1723
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1724
        if result:
 
1725
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1726
                    (self._host, self._port, os.strerror(result)))
 
1727
        self._connected = True
 
1728
 
 
1729
    def _flush(self):
 
1730
        """See SmartClientStreamMedium._flush().
 
1731
        
 
1732
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1733
        add a means to do a flush, but that can be done in the future.
 
1734
        """
 
1735
 
 
1736
    def _read_bytes(self, count):
 
1737
        """See SmartClientMedium.read_bytes."""
 
1738
        if not self._connected:
 
1739
            raise errors.MediumNotConnected(self)
 
1740
        return self._socket.recv(count)
 
1741
 
 
1742
 
 
1743
class SmartTCPTransport(SmartTransport):
403
1744
    """Connection to smart server over plain tcp.
404
1745
    
405
1746
    This is essentially just a factory to get 'RemoteTransport(url,
418
1759
                raise errors.InvalidURL(
419
1760
                    path=url, extra="invalid port %s" % _port)
420
1761
        medium = SmartTCPClientMedium(_host, _port)
421
 
        super(RemoteTCPTransport, self).__init__(url, medium=medium)
422
 
 
423
 
 
424
 
class RemoteSSHTransport(RemoteTransport):
 
1762
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1763
 
 
1764
 
 
1765
class SmartSSHTransport(SmartTransport):
425
1766
    """Connection to smart server over SSH.
426
1767
 
427
1768
    This is essentially just a factory to get 'RemoteTransport(url,
438
1779
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
439
1780
                _port)
440
1781
        medium = SmartSSHClientMedium(_host, _port, _username, _password)
441
 
        super(RemoteSSHTransport, self).__init__(url, medium=medium)
442
 
 
443
 
 
444
 
class RemoteHTTPTransport(RemoteTransport):
 
1782
        super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1783
 
 
1784
 
 
1785
class SmartHTTPTransport(SmartTransport):
445
1786
    """Just a way to connect between a bzr+http:// url and http://.
446
1787
    
447
 
    This connection operates slightly differently than the RemoteSSHTransport.
 
1788
    This connection operates slightly differently than the SmartSSHTransport.
448
1789
    It uses a plain http:// transport underneath, which defines what remote
449
1790
    .bzr/smart URL we are connected to. From there, all paths that are sent are
450
1791
    sent as relative paths, this way, the remote side can properly
461
1802
        else:
462
1803
            self._http_transport = http_transport
463
1804
        http_medium = self._http_transport.get_smart_medium()
464
 
        super(RemoteHTTPTransport, self).__init__(url, medium=http_medium)
 
1805
        super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
465
1806
 
466
1807
    def _remote_path(self, relpath):
467
1808
        """After connecting HTTP Transport only deals in relative URLs."""
479
1820
        return self._unparse_url(self._combine_paths(self._path, relpath))
480
1821
 
481
1822
    def clone(self, relative_url):
482
 
        """Make a new RemoteHTTPTransport related to me.
 
1823
        """Make a new SmartHTTPTransport related to me.
483
1824
 
484
1825
        This is re-implemented rather than using the default
485
 
        RemoteTransport.clone() because we must be careful about the underlying
 
1826
        SmartTransport.clone() because we must be careful about the underlying
486
1827
        http transport.
487
1828
        """
488
1829
        if relative_url:
492
1833
        # By cloning the underlying http_transport, we are able to share the
493
1834
        # connection.
494
1835
        new_transport = self._http_transport.clone(relative_url)
495
 
        return RemoteHTTPTransport(abs_url, http_transport=new_transport)
 
1836
        return SmartHTTPTransport(abs_url, http_transport=new_transport)
496
1837
 
497
1838
 
498
1839
def get_test_permutations():
499
1840
    """Return (transport, server) permutations for testing."""
500
 
    from bzrlib.smart import server
501
1841
    ### We may need a little more test framework support to construct an
502
1842
    ### appropriate RemoteTransport in the future.
503
 
    return [(RemoteTCPTransport, server.SmartTCPServer_for_testing)]
 
1843
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]