/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

Starting factoring out the smart server client "medium" from the protocol.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK+ TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the
 
51
idea that you have multiple requests and get have a read error because the
 
52
other side did shutdown sd send.  For pipes we have read pipe which will have a
 
53
zero read which marks end-of-file.  For HTTP server environment there is not
 
54
end-of-stream because each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialisation, deserialisation)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialisation, deserialisation)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
109
"""
 
110
 
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
 
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
 
197
#       the socket, and it assumes everything is UTF8 sections separated
 
198
#       by \001. Which means a request like '\002' Will abort the connection
 
199
#       because of a UnicodeDecodeError. It does look like invalid data will
 
200
#       kill the SmartServerStreamMedium, but only with an abort + exception, and 
 
201
#       the overall server shouldn't die.
 
202
 
 
203
from cStringIO import StringIO
 
204
import errno
 
205
import os
 
206
import socket
 
207
import sys
 
208
import tempfile
 
209
import threading
 
210
import urllib
 
211
import urlparse
 
212
 
 
213
from bzrlib import (
 
214
    bzrdir,
 
215
    errors,
 
216
    revision,
 
217
    transport,
 
218
    trace,
 
219
    urlutils,
 
220
    )
 
221
from bzrlib.bundle.serializer import write_bundle
 
222
from bzrlib.trace import mutter
 
223
from bzrlib.transport import local
 
224
 
 
225
# must do this otherwise urllib can't parse the urls properly :(
 
226
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
 
227
    transport.register_urlparse_netloc_protocol(scheme)
 
228
del scheme
 
229
 
 
230
 
 
231
def _recv_tuple(from_file):
 
232
    req_line = from_file.readline()
 
233
    return _decode_tuple(req_line)
 
234
 
 
235
 
 
236
def _decode_tuple(req_line):
 
237
    if req_line == None or req_line == '':
 
238
        return None
 
239
    if req_line[-1] != '\n':
 
240
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
241
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
242
 
 
243
 
 
244
def _encode_tuple(args):
 
245
    """Encode the tuple args to a bytestream."""
 
246
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
 
247
 
 
248
 
 
249
class SmartProtocolBase(object):
 
250
    """Methods common to client and server"""
 
251
 
 
252
    def _send_bulk_data(self, body, a_file=None):
 
253
        """Send chunked body data"""
 
254
        assert isinstance(body, str)
 
255
        bytes = self._encode_bulk_data(body)
 
256
        self._write_and_flush(bytes, a_file)
 
257
 
 
258
    def _encode_bulk_data(self, body):
 
259
        """Encode body as a bulk data chunk."""
 
260
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
261
 
 
262
    # TODO: this only actually accomodates a single block; possibly should support
 
263
    # multiple chunks?
 
264
    def _recv_bulk(self):
 
265
        chunk_len = self._in.readline()
 
266
        try:
 
267
            chunk_len = int(chunk_len)
 
268
        except ValueError:
 
269
            raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
 
270
        bulk = self._in.read(chunk_len)
 
271
        if len(bulk) != chunk_len:
 
272
            raise errors.SmartProtocolError("short read fetching bulk data chunk")
 
273
        self._recv_trailer()
 
274
        return bulk
 
275
 
 
276
    def _recv_tuple(self):
 
277
        return _recv_tuple(self._in)
 
278
 
 
279
    def _recv_trailer(self):
 
280
        resp = self._recv_tuple()
 
281
        if resp == ('done', ):
 
282
            return
 
283
        else:
 
284
            self._translate_error(resp)
 
285
 
 
286
    def _serialise_offsets(self, offsets):
 
287
        """Serialise a readv offset list."""
 
288
        txt = []
 
289
        for start, length in offsets:
 
290
            txt.append('%d,%d' % (start, length))
 
291
        return '\n'.join(txt)
 
292
 
 
293
    def _write_and_flush(self, bytes, a_file=None):
 
294
        """Write bytes to self._out and flush it."""
 
295
        # XXX: this will be inefficient.  Just ask Robert.
 
296
        if a_file is None:
 
297
            a_file = self._out
 
298
        a_file.write(bytes)
 
299
        a_file.flush()
 
300
 
 
301
 
 
302
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
303
    """Server-side encoding and decoding logic for smart version 1."""
 
304
    
 
305
    def __init__(self, output_stream, backing_transport):
 
306
        self._out_stream = output_stream
 
307
        self._backing_transport = backing_transport
 
308
        self.finished_reading = False
 
309
        self.in_buffer = ''
 
310
        self.has_dispatched = False
 
311
        self.request = None
 
312
        self._body_decoder = None
 
313
 
 
314
    def accept_bytes(self, bytes):
 
315
        """Take bytes, and advance the internal state machine appropriately.
 
316
        
 
317
        :param bytes: must be a byte string
 
318
        """
 
319
        assert isinstance(bytes, str)
 
320
        self.in_buffer += bytes
 
321
        if not self.has_dispatched:
 
322
            if '\n' not in self.in_buffer:
 
323
                # no command line yet
 
324
                return
 
325
            self.has_dispatched = True
 
326
            # XXX if in_buffer not \n-terminated this will do the wrong
 
327
            # thing.
 
328
            try:
 
329
                assert self.in_buffer.endswith('\n')
 
330
                req_args = _decode_tuple(self.in_buffer)
 
331
                self.in_buffer = ''
 
332
                self.request = SmartServerRequestHandler(
 
333
                    self._backing_transport)
 
334
                self.request.dispatch_command(req_args[0], req_args[1:])
 
335
                if self.request.finished_reading:
 
336
                    # trivial request
 
337
                    self._send_response(self.request.response.args,
 
338
                        self.request.response.body)
 
339
                self.sync_with_request(self.request)
 
340
                return self.request
 
341
            except KeyboardInterrupt:
 
342
                raise
 
343
            except Exception, exception:
 
344
                # everything else: pass to client, flush, and quit
 
345
                self._send_response(('error', str(exception)))
 
346
                return None
 
347
 
 
348
        else:
 
349
            if self.finished_reading:
 
350
                # nothing to do.XXX: this routine should be a single state 
 
351
                # machine too.
 
352
                return
 
353
            if self._body_decoder is None:
 
354
                self._body_decoder = LengthPrefixedBodyDecoder()
 
355
            self._body_decoder.accept_bytes(self.in_buffer)
 
356
            self.in_buffer = self._body_decoder.unused_data
 
357
            body_data = self._body_decoder.read_pending_data()
 
358
            self.request.accept_body(body_data)
 
359
            if self._body_decoder.finished_reading:
 
360
                self.request.end_of_body()
 
361
                assert self.request.finished_reading, \
 
362
                    "no more body, request not finished"
 
363
            self.sync_with_request(self.request)
 
364
            if self.request.response is not None:
 
365
                self._send_response(self.request.response.args,
 
366
                    self.request.response.body)
 
367
            else:
 
368
                assert not self.request.finished_reading, \
 
369
                    "no response and we have finished reading."
 
370
 
 
371
    def _send_response(self, args, body=None):
 
372
        """Send a smart server response down the output stream."""
 
373
        self._out_stream.write(_encode_tuple(args))
 
374
        if body is None:
 
375
            self._out_stream.flush()
 
376
        else:
 
377
            self._send_bulk_data(body, self._out_stream)
 
378
            #self._out_stream.write('BLARGH')
 
379
 
 
380
    def sync_with_request(self, request):
 
381
        self.finished_reading = request.finished_reading
 
382
        
 
383
        
 
384
class LengthPrefixedBodyDecoder(object):
 
385
    """Decodes the length-prefixed bulk data."""
 
386
    
 
387
    def __init__(self):
 
388
        self.finished_reading = False
 
389
        self.unused_data = ''
 
390
        self.state_accept = self._state_accept_expecting_length
 
391
        self.state_read = self._state_read_no_data
 
392
        self._in_buffer = ''
 
393
        self._trailer_buffer = ''
 
394
    
 
395
    def accept_bytes(self, bytes):
 
396
        """Decode as much of bytes as possible.
 
397
 
 
398
        If 'bytes' contains too much data it will be appended to
 
399
        self.unused_data.
 
400
 
 
401
        finished_reading will be set when no more data is required.  Further
 
402
        data will be appended to self.unused_data.
 
403
        """
 
404
        # accept_bytes is allowed to change the state
 
405
        current_state = self.state_accept
 
406
        self.state_accept(bytes)
 
407
        while current_state != self.state_accept:
 
408
            current_state = self.state_accept
 
409
            self.state_accept('')
 
410
 
 
411
    def read_pending_data(self):
 
412
        """Return any pending data that has been decoded."""
 
413
        return self.state_read()
 
414
 
 
415
    def _state_accept_expecting_length(self, bytes):
 
416
        self._in_buffer += bytes
 
417
        pos = self._in_buffer.find('\n')
 
418
        if pos == -1:
 
419
            return
 
420
        self.bytes_left = int(self._in_buffer[:pos])
 
421
        self._in_buffer = self._in_buffer[pos+1:]
 
422
        self.bytes_left -= len(self._in_buffer)
 
423
        self.state_accept = self._state_accept_reading_body
 
424
        self.state_read = self._state_read_in_buffer
 
425
 
 
426
    def _state_accept_reading_body(self, bytes):
 
427
        self._in_buffer += bytes
 
428
        self.bytes_left -= len(bytes)
 
429
        if self.bytes_left <= 0:
 
430
            # Finished with body
 
431
            if self.bytes_left != 0:
 
432
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
433
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
434
            self.state_accept = self._state_accept_reading_trailer
 
435
        
 
436
    def _state_accept_reading_trailer(self, bytes):
 
437
        self._trailer_buffer += bytes
 
438
        if self._trailer_buffer.startswith('done\n'):
 
439
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
440
            self.state_accept = self._state_accept_reading_unused
 
441
            self.finished_reading = True
 
442
    
 
443
    def _state_accept_reading_unused(self, bytes):
 
444
        self.unused_data += bytes
 
445
 
 
446
    def _state_read_no_data(self):
 
447
        return ''
 
448
 
 
449
    def _state_read_in_buffer(self):
 
450
        result = self._in_buffer
 
451
        self._in_buffer = ''
 
452
        return result
 
453
            
 
454
        
 
455
 
 
456
        
 
457
 
 
458
class SmartServerStreamMedium(SmartProtocolBase):
 
459
    """Handles smart commands coming over a stream.
 
460
 
 
461
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
462
    in-process fifo for testing.
 
463
 
 
464
    One instance is created for each connected client; it can serve multiple
 
465
    requests in the lifetime of the connection.
 
466
 
 
467
    The server passes requests through to an underlying backing transport, 
 
468
    which will typically be a LocalTransport looking at the server's filesystem.
 
469
    """
 
470
 
 
471
    def __init__(self, in_file, out_file, backing_transport):
 
472
        """Construct new server.
 
473
 
 
474
        :param in_file: Python file from which requests can be read.
 
475
        :param out_file: Python file to write responses.
 
476
        :param backing_transport: Transport for the directory served.
 
477
        """
 
478
        self._in = in_file
 
479
        self._out = out_file
 
480
        self.backing_transport = backing_transport
 
481
 
 
482
    def _recv_tuple(self):
 
483
        """Read a request from the client and return as a tuple.
 
484
        
 
485
        Returns None at end of file (if the client closed the connection.)
 
486
        """
 
487
        # ** Deserialise and read bytes
 
488
        return _recv_tuple(self._in)
 
489
 
 
490
    def _send_tuple(self, args):
 
491
        """Send response header"""
 
492
        # ** serialise and write bytes
 
493
        return self._write_and_flush(_encode_tuple(args))
 
494
 
 
495
    def _send_error_and_disconnect(self, exception):
 
496
        # ** serialise and write bytes
 
497
        self._send_tuple(('error', str(exception)))
 
498
        ## self._out.close()
 
499
        ## self._in.close()
 
500
 
 
501
    def _serve_one_request(self):
 
502
        """Read one request from input, process, send back a response.
 
503
        
 
504
        :return: False if the server should terminate, otherwise None.
 
505
        """
 
506
        # ** deserialise, read bytes, serialise and write bytes
 
507
        req_line = self._in.readline()
 
508
        # this should just test "req_line == ''", surely?  -- Andrew Bennetts
 
509
        if req_line in ('', None):
 
510
            # client closed connection
 
511
            return False  # shutdown server
 
512
        try:
 
513
            protocol = SmartServerRequestProtocolOne(self._out,
 
514
                self.backing_transport)
 
515
            protocol.accept_bytes(req_line)
 
516
            if not protocol.finished_reading:
 
517
                # this boils down to readline which wont block on open sockets
 
518
                # without data. We should really though read as much as is
 
519
                # available and then hand to that accept_bytes without this
 
520
                # silly double-decode.
 
521
                bulk = self._recv_bulk()
 
522
                bulk_bytes = ''.join(('%d\n' % len(bulk), bulk, 'done\n'))
 
523
                protocol.accept_bytes(bulk_bytes)
 
524
                # might be nice to do protocol.end_of_bytes()
 
525
                # because self._recv_bulk reads all the bytes, this must finish
 
526
                # after one delivery of data rather than looping.
 
527
                assert protocol.finished_reading
 
528
        except KeyboardInterrupt:
 
529
            raise
 
530
        except Exception, e:
 
531
            # everything else: pass to client, flush, and quit
 
532
            self._send_error_and_disconnect(e)
 
533
            return False
 
534
 
 
535
    def serve(self):
 
536
        """Serve requests until the client disconnects."""
 
537
        # Keep a reference to stderr because the sys module's globals get set to
 
538
        # None during interpreter shutdown.
 
539
        from sys import stderr
 
540
        try:
 
541
            while self._serve_one_request() != False:
 
542
                pass
 
543
        except Exception, e:
 
544
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
545
            raise
 
546
 
 
547
 
 
548
class SmartServerResponse(object):
 
549
    """Response generated by SmartServerRequestHandler."""
 
550
 
 
551
    def __init__(self, args, body=None):
 
552
        self.args = args
 
553
        self.body = body
 
554
 
 
555
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
556
# for delivering the data for a request. This could be done with as the
 
557
# StreamServer, though that would create conflation between request and response
 
558
# which may be undesirable.
 
559
 
 
560
 
 
561
class SmartServerRequestHandler(object):
 
562
    """Protocol logic for smart server.
 
563
    
 
564
    This doesn't handle serialization at all, it just processes requests and
 
565
    creates responses.
 
566
    """
 
567
 
 
568
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
569
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
570
    # from the object protocol: we will want to tweak the wire protocol separate
 
571
    # from the object model, and ideally we will be able to do that without
 
572
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
573
    # just a Protocol subclass.
 
574
 
 
575
    # TODO: Better way of representing the body for commands that take it,
 
576
    # and allow it to be streamed into the server.
 
577
    
 
578
    def __init__(self, backing_transport):
 
579
        self._backing_transport = backing_transport
 
580
        self._converted_command = False
 
581
        self.finished_reading = False
 
582
        self._body_bytes = ''
 
583
        self.response = None
 
584
 
 
585
    def accept_body(self, bytes):
 
586
        """Accept body data.
 
587
 
 
588
        This should be overriden for each command that desired body data to
 
589
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
590
 
 
591
        The deserialisation into that format should be done in the Protocol
 
592
        object. Set self.desired_body_format to the format your method will
 
593
        handle.
 
594
        """
 
595
        # default fallback is to accumulate bytes.
 
596
        self._body_bytes += bytes
 
597
        
 
598
    def _end_of_body_handler(self):
 
599
        """An unimplemented end of body handler."""
 
600
        raise NotImplementedError(self._end_of_body_handler)
 
601
        
 
602
    def do_hello(self):
 
603
        """Answer a version request with my version."""
 
604
        return SmartServerResponse(('ok', '1'))
 
605
 
 
606
    def do_has(self, relpath):
 
607
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
608
        return SmartServerResponse((r,))
 
609
 
 
610
    def do_get(self, relpath):
 
611
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
612
        return SmartServerResponse(('ok',), backing_bytes)
 
613
 
 
614
    def _deserialise_optional_mode(self, mode):
 
615
        # XXX: FIXME this should be on the protocol object.
 
616
        if mode == '':
 
617
            return None
 
618
        else:
 
619
            return int(mode)
 
620
 
 
621
    def do_append(self, relpath, mode):
 
622
        self._converted_command = True
 
623
        self._relpath = relpath
 
624
        self._mode = self._deserialise_optional_mode(mode)
 
625
        self._end_of_body_handler = self._handle_do_append_end
 
626
    
 
627
    def _handle_do_append_end(self):
 
628
        old_length = self._backing_transport.append_bytes(
 
629
            self._relpath, self._body_bytes, self._mode)
 
630
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
631
 
 
632
    def do_delete(self, relpath):
 
633
        self._backing_transport.delete(relpath)
 
634
 
 
635
    def do_iter_files_recursive(self, abspath):
 
636
        # XXX: the path handling needs some thought.
 
637
        #relpath = self._backing_transport.relpath(abspath)
 
638
        transport = self._backing_transport.clone(abspath)
 
639
        filenames = transport.iter_files_recursive()
 
640
        return SmartServerResponse(('names',) + tuple(filenames))
 
641
 
 
642
    def do_list_dir(self, relpath):
 
643
        filenames = self._backing_transport.list_dir(relpath)
 
644
        return SmartServerResponse(('names',) + tuple(filenames))
 
645
 
 
646
    def do_mkdir(self, relpath, mode):
 
647
        self._backing_transport.mkdir(relpath,
 
648
                                      self._deserialise_optional_mode(mode))
 
649
 
 
650
    def do_move(self, rel_from, rel_to):
 
651
        self._backing_transport.move(rel_from, rel_to)
 
652
 
 
653
    def do_put(self, relpath, mode):
 
654
        self._converted_command = True
 
655
        self._relpath = relpath
 
656
        self._mode = self._deserialise_optional_mode(mode)
 
657
        self._end_of_body_handler = self._handle_do_put
 
658
 
 
659
    def _handle_do_put(self):
 
660
        self._backing_transport.put_bytes(self._relpath,
 
661
                self._body_bytes, self._mode)
 
662
        self.response = SmartServerResponse(('ok',))
 
663
 
 
664
    def _deserialise_offsets(self, text):
 
665
        # XXX: FIXME this should be on the protocol object.
 
666
        offsets = []
 
667
        for line in text.split('\n'):
 
668
            if not line:
 
669
                continue
 
670
            start, length = line.split(',')
 
671
            offsets.append((int(start), int(length)))
 
672
        return offsets
 
673
 
 
674
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
675
        self._converted_command = True
 
676
        self._end_of_body_handler = self._handle_put_non_atomic
 
677
        self._relpath = relpath
 
678
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
679
        self._mode = self._deserialise_optional_mode(mode)
 
680
        # a boolean would be nicer XXX
 
681
        self._create_parent = (create_parent == 'T')
 
682
 
 
683
    def _handle_put_non_atomic(self):
 
684
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
685
                self._body_bytes,
 
686
                mode=self._mode,
 
687
                create_parent_dir=self._create_parent,
 
688
                dir_mode=self._dir_mode)
 
689
        self.response = SmartServerResponse(('ok',))
 
690
 
 
691
    def do_readv(self, relpath):
 
692
        self._converted_command = True
 
693
        self._end_of_body_handler = self._handle_readv_offsets
 
694
        self._relpath = relpath
 
695
 
 
696
    def end_of_body(self):
 
697
        """No more body data will be received."""
 
698
        self._run_handler_code(self._end_of_body_handler, (), {})
 
699
        # cannot read after this.
 
700
        self.finished_reading = True
 
701
 
 
702
    def _handle_readv_offsets(self):
 
703
        """accept offsets for a readv request."""
 
704
        offsets = self._deserialise_offsets(self._body_bytes)
 
705
        backing_bytes = ''.join(bytes for offset, bytes in
 
706
            self._backing_transport.readv(self._relpath, offsets))
 
707
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
708
        
 
709
    def do_rename(self, rel_from, rel_to):
 
710
        self._backing_transport.rename(rel_from, rel_to)
 
711
 
 
712
    def do_rmdir(self, relpath):
 
713
        self._backing_transport.rmdir(relpath)
 
714
 
 
715
    def do_stat(self, relpath):
 
716
        stat = self._backing_transport.stat(relpath)
 
717
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
718
        
 
719
    def do_get_bundle(self, path, revision_id):
 
720
        # open transport relative to our base
 
721
        t = self._backing_transport.clone(path)
 
722
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
723
        repo = control.open_repository()
 
724
        tmpf = tempfile.TemporaryFile()
 
725
        base_revision = revision.NULL_REVISION
 
726
        write_bundle(repo, revision_id, base_revision, tmpf)
 
727
        tmpf.seek(0)
 
728
        return SmartServerResponse((), tmpf.read())
 
729
 
 
730
    def dispatch_command(self, cmd, args):
 
731
        """Deprecated compatibility method.""" # XXX XXX
 
732
        func = getattr(self, 'do_' + cmd, None)
 
733
        if func is None:
 
734
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
735
        self._run_handler_code(func, args, {})
 
736
 
 
737
    def _run_handler_code(self, callable, args, kwargs):
 
738
        """Run some handler specific code 'callable'.
 
739
 
 
740
        If a result is returned, it is considered to be the commands response,
 
741
        and finished_reading is set true, and its assigned to self.response.
 
742
 
 
743
        Any exceptions caught are translated and a response object created
 
744
        from them.
 
745
        """
 
746
        result = self._call_converting_errors(callable, args, kwargs)
 
747
        if result is not None:
 
748
            self.response = result
 
749
            self.finished_reading = True
 
750
        # handle unconverted commands
 
751
        if not self._converted_command:
 
752
            self.finished_reading = True
 
753
            if result is None:
 
754
                self.response = SmartServerResponse(('ok',))
 
755
 
 
756
    def _call_converting_errors(self, callable, args, kwargs):
 
757
        """Call callable converting errors to Response objects."""
 
758
        try:
 
759
            return callable(*args, **kwargs)
 
760
        except errors.NoSuchFile, e:
 
761
            return SmartServerResponse(('NoSuchFile', e.path))
 
762
        except errors.FileExists, e:
 
763
            return SmartServerResponse(('FileExists', e.path))
 
764
        except errors.DirectoryNotEmpty, e:
 
765
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
766
        except errors.ShortReadvError, e:
 
767
            return SmartServerResponse(('ShortReadvError',
 
768
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
769
        except UnicodeError, e:
 
770
            # If it is a DecodeError, than most likely we are starting
 
771
            # with a plain string
 
772
            str_or_unicode = e.object
 
773
            if isinstance(str_or_unicode, unicode):
 
774
                val = u'u:' + str_or_unicode
 
775
            else:
 
776
                val = u's:' + str_or_unicode.encode('base64')
 
777
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
778
            return SmartServerResponse((e.__class__.__name__,
 
779
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
780
        except errors.TransportNotPossible, e:
 
781
            if e.msg == "readonly transport":
 
782
                return SmartServerResponse(('ReadOnlyError', ))
 
783
            else:
 
784
                raise
 
785
 
 
786
 
 
787
class SmartTCPServer(object):
 
788
    """Listens on a TCP socket and accepts connections from smart clients"""
 
789
 
 
790
    def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
 
791
        """Construct a new server.
 
792
 
 
793
        To actually start it running, call either start_background_thread or
 
794
        serve.
 
795
 
 
796
        :param host: Name of the interface to listen on.
 
797
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
798
        """
 
799
        if backing_transport is None:
 
800
            backing_transport = memory.MemoryTransport()
 
801
        self._server_socket = socket.socket()
 
802
        self._server_socket.bind((host, port))
 
803
        self.port = self._server_socket.getsockname()[1]
 
804
        self._server_socket.listen(1)
 
805
        self._server_socket.settimeout(1)
 
806
        self.backing_transport = backing_transport
 
807
 
 
808
    def serve(self):
 
809
        # let connections timeout so that we get a chance to terminate
 
810
        # Keep a reference to the exceptions we want to catch because the socket
 
811
        # module's globals get set to None during interpreter shutdown.
 
812
        from socket import timeout as socket_timeout
 
813
        from socket import error as socket_error
 
814
        self._should_terminate = False
 
815
        while not self._should_terminate:
 
816
            try:
 
817
                self.accept_and_serve()
 
818
            except socket_timeout:
 
819
                # just check if we're asked to stop
 
820
                pass
 
821
            except socket_error, e:
 
822
                trace.warning("client disconnected: %s", e)
 
823
                pass
 
824
 
 
825
    def get_url(self):
 
826
        """Return the url of the server"""
 
827
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
828
 
 
829
    def accept_and_serve(self):
 
830
        conn, client_addr = self._server_socket.accept()
 
831
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
832
        from_client = conn.makefile('r')
 
833
        to_client = conn.makefile('w')
 
834
        handler = SmartServerStreamMedium(from_client, to_client,
 
835
                self.backing_transport)
 
836
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
837
        connection_thread.setDaemon(True)
 
838
        connection_thread.start()
 
839
 
 
840
    def start_background_thread(self):
 
841
        self._server_thread = threading.Thread(None,
 
842
                self.serve,
 
843
                name='server-' + self.get_url())
 
844
        self._server_thread.setDaemon(True)
 
845
        self._server_thread.start()
 
846
 
 
847
    def stop_background_thread(self):
 
848
        self._should_terminate = True
 
849
        # self._server_socket.close()
 
850
        # we used to join the thread, but it's not really necessary; it will
 
851
        # terminate in time
 
852
        ## self._server_thread.join()
 
853
 
 
854
 
 
855
class SmartTCPServer_for_testing(SmartTCPServer):
 
856
    """Server suitable for use by transport tests.
 
857
    
 
858
    This server is backed by the process's cwd.
 
859
    """
 
860
 
 
861
    def __init__(self):
 
862
        self._homedir = os.getcwd()
 
863
        # The server is set up by default like for ssh access: the client
 
864
        # passes filesystem-absolute paths; therefore the server must look
 
865
        # them up relative to the root directory.  it might be better to act
 
866
        # a public server and have the server rewrite paths into the test
 
867
        # directory.
 
868
        SmartTCPServer.__init__(self, transport.get_transport("file:///"))
 
869
        
 
870
    def setUp(self):
 
871
        """Set up server for testing"""
 
872
        self.start_background_thread()
 
873
 
 
874
    def tearDown(self):
 
875
        self.stop_background_thread()
 
876
 
 
877
    def get_url(self):
 
878
        """Return the url of the server"""
 
879
        host, port = self._server_socket.getsockname()
 
880
        # XXX: I think this is likely to break on windows -- self._homedir will
 
881
        # have backslashes (and maybe a drive letter?).
 
882
        #  -- Andrew Bennetts, 2006-08-29
 
883
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
884
 
 
885
    def get_bogus_url(self):
 
886
        """Return a URL which will fail to connect"""
 
887
        return 'bzr://127.0.0.1:1/'
 
888
 
 
889
 
 
890
class SmartStat(object):
 
891
 
 
892
    def __init__(self, size, mode):
 
893
        self.st_size = size
 
894
        self.st_mode = mode
 
895
 
 
896
 
 
897
class SmartTransport(transport.Transport):
 
898
    """Connection to a smart server.
 
899
 
 
900
    The connection holds references to pipes that can be used to send requests
 
901
    to the server.
 
902
 
 
903
    The connection has a notion of the current directory to which it's
 
904
    connected; this is incorporated in filenames passed to the server.
 
905
    
 
906
    This supports some higher-level RPC operations and can also be treated 
 
907
    like a Transport to do file-like operations.
 
908
 
 
909
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
910
    or a series of http requests.  There are concrete subclasses for each
 
911
    type: SmartTCPTransport, etc.
 
912
    """
 
913
 
 
914
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
915
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
916
    # the ability to support multiple versions of the smart protocol over time:
 
917
    # SmartTransport is an adapter from the Transport object model to the 
 
918
    # SmartClient model, not an encoder.
 
919
 
 
920
    def __init__(self, url, clone_from=None, medium=None):
 
921
        """Constructor.
 
922
 
 
923
        :param medium: The medium to use for this RemoteTransport. This must be
 
924
            supplied if clone_from is None.
 
925
        """
 
926
        ### Technically super() here is faulty because Transport's __init__
 
927
        ### fails to take 2 parameters, and if super were to choose a silly
 
928
        ### initialisation order things would blow up. 
 
929
        if not url.endswith('/'):
 
930
            url += '/'
 
931
        super(SmartTransport, self).__init__(url)
 
932
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
933
                transport.split_url(url)
 
934
        if clone_from is None:
 
935
            self._client = medium
 
936
        else:
 
937
            # credentials may be stripped from the base in some circumstances
 
938
            # as yet to be clearly defined or documented, so copy them.
 
939
            self._username = clone_from._username
 
940
            # reuse same connection
 
941
            self._client = clone_from._client
 
942
        assert self._client is not None
 
943
 
 
944
    def abspath(self, relpath):
 
945
        """Return the full url to the given relative path.
 
946
        
 
947
        @param relpath: the relative path or path components
 
948
        @type relpath: str or list
 
949
        """
 
950
        return self._unparse_url(self._remote_path(relpath))
 
951
    
 
952
    def clone(self, relative_url):
 
953
        """Make a new SmartTransport related to me, sharing the same connection.
 
954
 
 
955
        This essentially opens a handle on a different remote directory.
 
956
        """
 
957
        if relative_url is None:
 
958
            return SmartTransport(self.base, self)
 
959
        else:
 
960
            return SmartTransport(self.abspath(relative_url), self)
 
961
 
 
962
    def is_readonly(self):
 
963
        """Smart server transport can do read/write file operations."""
 
964
        return False
 
965
                                                   
 
966
    def get_smart_client(self):
 
967
        return self._client
 
968
 
 
969
    def get_smart_medium(self):
 
970
        return self._client
 
971
                                                   
 
972
    def _unparse_url(self, path):
 
973
        """Return URL for a path.
 
974
 
 
975
        :see: SFTPUrlHandling._unparse_url
 
976
        """
 
977
        # TODO: Eventually it should be possible to unify this with
 
978
        # SFTPUrlHandling._unparse_url?
 
979
        if path == '':
 
980
            path = '/'
 
981
        path = urllib.quote(path)
 
982
        netloc = urllib.quote(self._host)
 
983
        if self._username is not None:
 
984
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
985
        if self._port is not None:
 
986
            netloc = '%s:%d' % (netloc, self._port)
 
987
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
988
 
 
989
    def _remote_path(self, relpath):
 
990
        """Returns the Unicode version of the absolute path for relpath."""
 
991
        return self._combine_paths(self._path, relpath)
 
992
 
 
993
    def has(self, relpath):
 
994
        """Indicate whether a remote file of the given name exists or not.
 
995
 
 
996
        :see: Transport.has()
 
997
        """
 
998
        resp = self._client._call('has', self._remote_path(relpath))
 
999
        if resp == ('yes', ):
 
1000
            return True
 
1001
        elif resp == ('no', ):
 
1002
            return False
 
1003
        else:
 
1004
            self._translate_error(resp)
 
1005
 
 
1006
    def get(self, relpath):
 
1007
        """Return file-like object reading the contents of a remote file.
 
1008
        
 
1009
        :see: Transport.get_bytes()/get_file()
 
1010
        """
 
1011
        remote = self._remote_path(relpath)
 
1012
        resp = self._client._call('get', remote)
 
1013
        if resp != ('ok', ):
 
1014
            self._translate_error(resp, relpath)
 
1015
        return StringIO(self._client._recv_bulk())
 
1016
 
 
1017
    def _serialise_optional_mode(self, mode):
 
1018
        if mode is None:
 
1019
            return ''
 
1020
        else:
 
1021
            return '%d' % mode
 
1022
 
 
1023
    def mkdir(self, relpath, mode=None):
 
1024
        resp = self._client._call('mkdir', 
 
1025
                                  self._remote_path(relpath), 
 
1026
                                  self._serialise_optional_mode(mode))
 
1027
        self._translate_error(resp)
 
1028
 
 
1029
    def put_bytes(self, relpath, upload_contents, mode=None):
 
1030
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
1031
        # should probably just pass all parameters as length-delimited
 
1032
        # strings?
 
1033
        resp = self._client._call_with_upload(
 
1034
            'put',
 
1035
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1036
            upload_contents)
 
1037
        self._translate_error(resp)
 
1038
 
 
1039
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
1040
                             create_parent_dir=False,
 
1041
                             dir_mode=None):
 
1042
        """See Transport.put_bytes_non_atomic."""
 
1043
        # FIXME: no encoding in the transport!
 
1044
        create_parent_str = 'F'
 
1045
        if create_parent_dir:
 
1046
            create_parent_str = 'T'
 
1047
 
 
1048
        resp = self._client._call_with_upload(
 
1049
            'put_non_atomic',
 
1050
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
1051
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
1052
            bytes)
 
1053
        self._translate_error(resp)
 
1054
 
 
1055
    def put_file(self, relpath, upload_file, mode=None):
 
1056
        # its not ideal to seek back, but currently put_non_atomic_file depends
 
1057
        # on transports not reading before failing - which is a faulty
 
1058
        # assumption I think - RBC 20060915
 
1059
        pos = upload_file.tell()
 
1060
        try:
 
1061
            return self.put_bytes(relpath, upload_file.read(), mode)
 
1062
        except:
 
1063
            upload_file.seek(pos)
 
1064
            raise
 
1065
 
 
1066
    def put_file_non_atomic(self, relpath, f, mode=None,
 
1067
                            create_parent_dir=False,
 
1068
                            dir_mode=None):
 
1069
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
1070
                                         create_parent_dir=create_parent_dir,
 
1071
                                         dir_mode=dir_mode)
 
1072
 
 
1073
    def append_file(self, relpath, from_file, mode=None):
 
1074
        return self.append_bytes(relpath, from_file.read(), mode)
 
1075
        
 
1076
    def append_bytes(self, relpath, bytes, mode=None):
 
1077
        resp = self._client._call_with_upload(
 
1078
            'append',
 
1079
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1080
            bytes)
 
1081
        if resp[0] == 'appended':
 
1082
            return int(resp[1])
 
1083
        self._translate_error(resp)
 
1084
 
 
1085
    def delete(self, relpath):
 
1086
        resp = self._client._call('delete', self._remote_path(relpath))
 
1087
        self._translate_error(resp)
 
1088
 
 
1089
    def readv(self, relpath, offsets):
 
1090
        if not offsets:
 
1091
            return
 
1092
 
 
1093
        offsets = list(offsets)
 
1094
 
 
1095
        sorted_offsets = sorted(offsets)
 
1096
        # turn the list of offsets into a stack
 
1097
        offset_stack = iter(offsets)
 
1098
        cur_offset_and_size = offset_stack.next()
 
1099
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
1100
                               limit=self._max_readv_combine,
 
1101
                               fudge_factor=self._bytes_to_read_before_seek))
 
1102
 
 
1103
 
 
1104
        resp = self._client._call_with_upload(
 
1105
            'readv',
 
1106
            (self._remote_path(relpath),),
 
1107
            self._client._serialise_offsets((c.start, c.length) for c in coalesced))
 
1108
 
 
1109
        if resp[0] != 'readv':
 
1110
            # This should raise an exception
 
1111
            self._translate_error(resp)
 
1112
            return
 
1113
 
 
1114
        data = self._client._recv_bulk()
 
1115
        # Cache the results, but only until they have been fulfilled
 
1116
        data_map = {}
 
1117
        for c_offset in coalesced:
 
1118
            if len(data) < c_offset.length:
 
1119
                raise errors.ShortReadvError(relpath, c_offset.start,
 
1120
                            c_offset.length, actual=len(data))
 
1121
            for suboffset, subsize in c_offset.ranges:
 
1122
                key = (c_offset.start+suboffset, subsize)
 
1123
                data_map[key] = data[suboffset:suboffset+subsize]
 
1124
            data = data[c_offset.length:]
 
1125
 
 
1126
            # Now that we've read some data, see if we can yield anything back
 
1127
            while cur_offset_and_size in data_map:
 
1128
                this_data = data_map.pop(cur_offset_and_size)
 
1129
                yield cur_offset_and_size[0], this_data
 
1130
                cur_offset_and_size = offset_stack.next()
 
1131
 
 
1132
    def rename(self, rel_from, rel_to):
 
1133
        self._call('rename', 
 
1134
                   self._remote_path(rel_from),
 
1135
                   self._remote_path(rel_to))
 
1136
 
 
1137
    def move(self, rel_from, rel_to):
 
1138
        self._call('move', 
 
1139
                   self._remote_path(rel_from),
 
1140
                   self._remote_path(rel_to))
 
1141
 
 
1142
    def rmdir(self, relpath):
 
1143
        resp = self._call('rmdir', self._remote_path(relpath))
 
1144
 
 
1145
    def _call(self, method, *args):
 
1146
        resp = self._client._call(method, *args)
 
1147
        self._translate_error(resp)
 
1148
 
 
1149
    def _translate_error(self, resp, orig_path=None):
 
1150
        """Raise an exception from a response"""
 
1151
        if resp is None:
 
1152
            what = None
 
1153
        else:
 
1154
            what = resp[0]
 
1155
        if what == 'ok':
 
1156
            return
 
1157
        elif what == 'NoSuchFile':
 
1158
            if orig_path is not None:
 
1159
                error_path = orig_path
 
1160
            else:
 
1161
                error_path = resp[1]
 
1162
            raise errors.NoSuchFile(error_path)
 
1163
        elif what == 'error':
 
1164
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1165
        elif what == 'FileExists':
 
1166
            raise errors.FileExists(resp[1])
 
1167
        elif what == 'DirectoryNotEmpty':
 
1168
            raise errors.DirectoryNotEmpty(resp[1])
 
1169
        elif what == 'ShortReadvError':
 
1170
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1171
                                         int(resp[3]), int(resp[4]))
 
1172
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1173
            encoding = str(resp[1]) # encoding must always be a string
 
1174
            val = resp[2]
 
1175
            start = int(resp[3])
 
1176
            end = int(resp[4])
 
1177
            reason = str(resp[5]) # reason must always be a string
 
1178
            if val.startswith('u:'):
 
1179
                val = val[2:]
 
1180
            elif val.startswith('s:'):
 
1181
                val = val[2:].decode('base64')
 
1182
            if what == 'UnicodeDecodeError':
 
1183
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1184
            elif what == 'UnicodeEncodeError':
 
1185
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1186
        elif what == "ReadOnlyError":
 
1187
            raise errors.TransportNotPossible('readonly transport')
 
1188
        else:
 
1189
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
1190
 
 
1191
    def _send_tuple(self, args):
 
1192
        self._client.accept_bytes(_encode_tuple(args))
 
1193
 
 
1194
    def _recv_tuple(self):
 
1195
        return self._client._recv_tuple()
 
1196
 
 
1197
    def disconnect(self):
 
1198
        self._client.disconnect()
 
1199
 
 
1200
    def delete_tree(self, relpath):
 
1201
        raise errors.TransportNotPossible('readonly transport')
 
1202
 
 
1203
    def stat(self, relpath):
 
1204
        resp = self._client._call('stat', self._remote_path(relpath))
 
1205
        if resp[0] == 'stat':
 
1206
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
1207
        else:
 
1208
            self._translate_error(resp)
 
1209
 
 
1210
    ## def lock_read(self, relpath):
 
1211
    ##     """Lock the given file for shared (read) access.
 
1212
    ##     :return: A lock object, which should be passed to Transport.unlock()
 
1213
    ##     """
 
1214
    ##     # The old RemoteBranch ignore lock for reading, so we will
 
1215
    ##     # continue that tradition and return a bogus lock object.
 
1216
    ##     class BogusLock(object):
 
1217
    ##         def __init__(self, path):
 
1218
    ##             self.path = path
 
1219
    ##         def unlock(self):
 
1220
    ##             pass
 
1221
    ##     return BogusLock(relpath)
 
1222
 
 
1223
    def listable(self):
 
1224
        return True
 
1225
 
 
1226
    def list_dir(self, relpath):
 
1227
        resp = self._client._call('list_dir',
 
1228
                                  self._remote_path(relpath))
 
1229
        if resp[0] == 'names':
 
1230
            return [name.encode('ascii') for name in resp[1:]]
 
1231
        else:
 
1232
            self._translate_error(resp)
 
1233
 
 
1234
    def iter_files_recursive(self):
 
1235
        resp = self._client._call('iter_files_recursive',
 
1236
                                  self._remote_path(''))
 
1237
        if resp[0] == 'names':
 
1238
            return resp[1:]
 
1239
        else:
 
1240
            self._translate_error(resp)
 
1241
 
 
1242
 
 
1243
class SmartStreamClient(SmartProtocolBase):
 
1244
    """Connection to smart server over two streams"""
 
1245
 
 
1246
    def _send_bulk_data(self, body):
 
1247
        self._ensure_connection()
 
1248
        SmartProtocolBase._send_bulk_data(self, body)
 
1249
        
 
1250
    def disconnect(self):
 
1251
        """If this medium maintains a persistent connection, close it.
 
1252
        
 
1253
        The default implementation does nothing.
 
1254
        """
 
1255
        
 
1256
    def _call(self, *args):
 
1257
        bytes = _encode_tuple(args)
 
1258
        # should be self.medium.accept_bytes(bytes) XXX
 
1259
        self.accept_bytes(bytes)
 
1260
        return self._recv_tuple()
 
1261
 
 
1262
    def _call_with_upload(self, method, args, body):
 
1263
        """Call an rpc, supplying bulk upload data.
 
1264
 
 
1265
        :param method: method name to call
 
1266
        :param args: parameter args tuple
 
1267
        :param body: upload body as a byte string
 
1268
        """
 
1269
        bytes = _encode_tuple((method,) + args)
 
1270
        bytes += self._encode_bulk_data(body)
 
1271
        # should be self.medium.accept_bytes XXX
 
1272
        self.accept_bytes(bytes)
 
1273
        return self._recv_tuple()
 
1274
 
 
1275
    def query_version(self):
 
1276
        """Return protocol version number of the server."""
 
1277
        resp = self._call('hello')
 
1278
        if resp == ('ok', '1'):
 
1279
            return 1
 
1280
        else:
 
1281
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1282
 
 
1283
 
 
1284
class SmartClientMedium(SmartStreamClient):
 
1285
    """Smart client is a medium for sending smart protocol requests over.
 
1286
 
 
1287
    XXX: we want explicit finalisation
 
1288
    """
 
1289
 
 
1290
 
 
1291
class SmartStreamMediumClient(SmartClientMedium):
 
1292
    """The SmartStreamMediumClient knows how to close the stream when it is
 
1293
    finished with it.
 
1294
    """
 
1295
 
 
1296
    def __del__(self):
 
1297
        self.disconnect()
 
1298
 
 
1299
 
 
1300
class SmartSimplePipesClientMedium(SmartClientMedium):
 
1301
    """A client medium using simple pipes.
 
1302
    
 
1303
    This client does not manage the pipes: it assumes they will always be open.
 
1304
    """
 
1305
 
 
1306
    def __init__(self, readable_pipe, writeable_pipe):
 
1307
        self._readable_pipe = readable_pipe
 
1308
        self._writeable_pipe = writeable_pipe
 
1309
 
 
1310
    def accept_bytes(self, bytes):
 
1311
        """See SmartClientMedium.accept_bytes."""
 
1312
        self._writeable_pipe.write(bytes)
 
1313
 
 
1314
    def read_bytes(self, count):
 
1315
        """See SmartClientMedium.read_bytes."""
 
1316
        return self._readable_pipe.read(count)
 
1317
 
 
1318
    def _recv_bulk(self):
 
1319
        """transitional api from 'client' to 'medium'."""
 
1320
        self._in = self._readable_pipe
 
1321
        try:
 
1322
            return SmartClientMedium._recv_bulk(self)
 
1323
        finally:
 
1324
            self._in = None
 
1325
 
 
1326
    def _recv_tuple(self):
 
1327
        """transitional api from 'client' to 'medium'."""
 
1328
        return _recv_tuple(self._readable_pipe)
 
1329
 
 
1330
    def _write_and_flush(self, bytes, file=None):
 
1331
        """Thunk from the 'client' api to the 'Medium' api."""
 
1332
        assert file is None
 
1333
        self.accept_bytes(bytes)
 
1334
 
 
1335
 
 
1336
class SmartSSHClientMedium(SmartStreamMediumClient):
 
1337
    """A client medium using SSH."""
 
1338
    
 
1339
    def __init__(self, host, port=None, username=None, password=None,
 
1340
            vendor=None):
 
1341
        """Creates a client that will connect on the first use.
 
1342
        
 
1343
        :param vendor: An optional override for the ssh vendor to use. See
 
1344
            bzrlib.transport.ssh for details on ssh vendors.
 
1345
        """
 
1346
        self._connected = False
 
1347
        self._host = host
 
1348
        self._password = password
 
1349
        self._port = port
 
1350
        self._username = username
 
1351
        self._read_from = None
 
1352
        self._ssh_connection = None
 
1353
        self._vendor = vendor
 
1354
        self._write_to = None
 
1355
 
 
1356
    def accept_bytes(self, bytes):
 
1357
        """See SmartClientMedium.accept_bytes."""
 
1358
        self._ensure_connection()
 
1359
        self._write_to.write(bytes)
 
1360
        self._write_to.flush()
 
1361
 
 
1362
    def disconnect(self):
 
1363
        """See SmartClientMedium.disconnect()."""
 
1364
        if not self._connected:
 
1365
            return
 
1366
        self._read_from.close()
 
1367
        self._write_to.close()
 
1368
        self._ssh_connection.close()
 
1369
        self._connected = False
 
1370
 
 
1371
    def _ensure_connection(self):
 
1372
        """Connect this medium if not already connected."""
 
1373
        if self._connected:
 
1374
            return
 
1375
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1376
        if self._vendor is None:
 
1377
            vendor = ssh._get_ssh_vendor()
 
1378
        else:
 
1379
            vendor = self._vendor
 
1380
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1381
                self._password, self._host, self._port,
 
1382
                command=[executable, 'serve', '--inet', '--directory=/',
 
1383
                         '--allow-writes'])
 
1384
        self._read_from, self._write_to = \
 
1385
            self._ssh_connection.get_filelike_channels()
 
1386
        self._connected = True
 
1387
 
 
1388
    def read_bytes(self, count):
 
1389
        """See SmartClientMedium.read_bytes."""
 
1390
        raise errors.MediumNotConnected(self)
 
1391
 
 
1392
    def _recv_bulk(self):
 
1393
        """transitional api from 'client' to 'medium'."""
 
1394
        self._in = self._read_from
 
1395
        try:
 
1396
            return SmartStreamMediumClient._recv_bulk(self)
 
1397
        finally:
 
1398
            self._in = None
 
1399
 
 
1400
    def _recv_tuple(self):
 
1401
        """transitional api from 'client' to 'medium'."""
 
1402
        return _recv_tuple(self._read_from)
 
1403
 
 
1404
    def _write_and_flush(self, bytes, file=None):
 
1405
        """Thunk from the 'client' api to the 'Medium' api."""
 
1406
        assert file is None
 
1407
        self.accept_bytes(bytes)
 
1408
 
 
1409
 
 
1410
class SmartTCPClientMedium(SmartStreamMediumClient):
 
1411
    """A client medium using TCP."""
 
1412
    
 
1413
    def __init__(self, host, port):
 
1414
        """Creates a client that will connect on the first use."""
 
1415
        self._connected = False
 
1416
        self._host = host
 
1417
        self._port = port
 
1418
        self._socket = None
 
1419
 
 
1420
    def accept_bytes(self, bytes):
 
1421
        """See SmartClientMedium.accept_bytes."""
 
1422
        self._ensure_connection()
 
1423
        self._socket.sendall(bytes)
 
1424
 
 
1425
    def disconnect(self):
 
1426
        """See SmartClientMedium.disconnect()."""
 
1427
        if not self._connected:
 
1428
            return
 
1429
        self._socket.close()
 
1430
        self._socket = None
 
1431
        self._connected = False
 
1432
 
 
1433
    def _ensure_connection(self):
 
1434
        """Connect this medium if not already connected."""
 
1435
        if self._connected:
 
1436
            return
 
1437
        self._socket = socket.socket()
 
1438
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1439
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1440
        if result:
 
1441
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1442
                    (self._host, self._port, os.strerror(result)))
 
1443
        self._connected = True
 
1444
 
 
1445
    def read_bytes(self, count):
 
1446
        """See SmartClientMedium.read_bytes."""
 
1447
        raise errors.MediumNotConnected(self)
 
1448
 
 
1449
    def _recv_bulk(self):
 
1450
        """transitional api from 'client' to 'medium'."""
 
1451
        self._in = self._socket.makefile('r', 0)
 
1452
        try:
 
1453
            return SmartStreamMediumClient._recv_bulk(self)
 
1454
        finally:
 
1455
            self._in = None
 
1456
 
 
1457
    def _recv_tuple(self):
 
1458
        """transitional api from 'client' to 'medium'."""
 
1459
        return _recv_tuple(self._socket.makefile('r', 0))
 
1460
 
 
1461
    def _write_and_flush(self, bytes, file=None):
 
1462
        """Thunk from the 'client' api to the 'Medium' api."""
 
1463
        assert file is None
 
1464
        self.accept_bytes(bytes)
 
1465
 
 
1466
 
 
1467
class SmartTCPTransport(SmartTransport):
 
1468
    """Connection to smart server over plain tcp.
 
1469
    
 
1470
    This is essentially just a factory to get 'RemoteTransport(url,
 
1471
        SmartTCPClientMedium).
 
1472
    """
 
1473
 
 
1474
    def __init__(self, url):
 
1475
        _scheme, _username, _password, _host, _port, _path = \
 
1476
            transport.split_url(url)
 
1477
        try:
 
1478
            _port = int(_port)
 
1479
        except (ValueError, TypeError), e:
 
1480
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
 
1481
        medium = SmartTCPClientMedium(_host, _port)
 
1482
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1483
 
 
1484
 
 
1485
try:
 
1486
    from bzrlib.transport import sftp, ssh
 
1487
except errors.ParamikoNotPresent:
 
1488
    # no paramiko, no SSHTransport.
 
1489
    pass
 
1490
else:
 
1491
    class SmartSSHTransport(SmartTransport):
 
1492
        """Connection to smart server over SSH.
 
1493
 
 
1494
        This is essentially just a factory to get 'RemoteTransport(url,
 
1495
            SmartSSHClientMedium).
 
1496
        """
 
1497
 
 
1498
        def __init__(self, url):
 
1499
            _scheme, _username, _password, _host, _port, _path = \
 
1500
                transport.split_url(url)
 
1501
            try:
 
1502
                if _port is not None:
 
1503
                    _port = int(_port)
 
1504
            except (ValueError, TypeError), e:
 
1505
                raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1506
                    _port)
 
1507
            medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1508
            super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1509
 
 
1510
 
 
1511
def get_test_permutations():
 
1512
    """Return (transport, server) permutations for testing."""
 
1513
    ### We may need a little more test framework support to construct an
 
1514
    ### appropriate RemoteTransport in the future.
 
1515
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]