/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: jml at canonical
  • Date: 2007-02-23 06:06:54 UTC
  • mto: (2298.3.1 jam-integration)
  • mto: This revision was merged to the branch mainline in revision 2300.
  • Revision ID: jml@canonical.com-20070223060654-hna3q6wwljswnkh3
Make set_plugins_path search user-specified directories (BZR_PLUGIN_PATH 
or get_default_plugin_path()) before searching in site-wide directories
(dirname(bzrlib.plugins.__file__)).

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""RemoteTransport client for the smart-server.
18
 
 
19
 
This module shouldn't be accessed directly.  The classes defined here should be
20
 
imported from bzrlib.smart.
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
 
51
that you have multiple requests and get a read error because the other side did
 
52
shutdown.  For pipes we have read pipe which will have a zero read which marks
 
53
end-of-file.  For HTTP server environment there is not end-of-stream because
 
54
each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out requests from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialization, deserialization)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialization, deserialization)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
21
109
"""
22
110
 
23
 
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
24
196
 
25
197
from cStringIO import StringIO
 
198
import os
 
199
import socket
 
200
import tempfile
 
201
import threading
 
202
import urllib
 
203
import urlparse
26
204
 
27
205
from bzrlib import (
28
 
    config,
29
 
    debug,
 
206
    bzrdir,
30
207
    errors,
31
 
    remote,
 
208
    revision,
 
209
    transport,
32
210
    trace,
33
 
    transport,
34
211
    urlutils,
35
212
    )
36
 
from bzrlib.smart import client, medium
37
 
from bzrlib.symbol_versioning import (
38
 
    deprecated_method,
39
 
    )
40
 
 
41
 
 
42
 
class _SmartStat(object):
 
213
from bzrlib.bundle.serializer import write_bundle
 
214
try:
 
215
    from bzrlib.transport import ssh
 
216
except errors.ParamikoNotPresent:
 
217
    # no paramiko.  SmartSSHClientMedium will break.
 
218
    pass
 
219
 
 
220
# must do this otherwise urllib can't parse the urls properly :(
 
221
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
 
222
    transport.register_urlparse_netloc_protocol(scheme)
 
223
del scheme
 
224
 
 
225
 
 
226
def _recv_tuple(from_file):
 
227
    req_line = from_file.readline()
 
228
    return _decode_tuple(req_line)
 
229
 
 
230
 
 
231
def _decode_tuple(req_line):
 
232
    if req_line == None or req_line == '':
 
233
        return None
 
234
    if req_line[-1] != '\n':
 
235
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
236
    return tuple(req_line[:-1].split('\x01'))
 
237
 
 
238
 
 
239
def _encode_tuple(args):
 
240
    """Encode the tuple args to a bytestream."""
 
241
    return '\x01'.join(args) + '\n'
 
242
 
 
243
 
 
244
class SmartProtocolBase(object):
 
245
    """Methods common to client and server"""
 
246
 
 
247
    # TODO: this only actually accomodates a single block; possibly should
 
248
    # support multiple chunks?
 
249
    def _encode_bulk_data(self, body):
 
250
        """Encode body as a bulk data chunk."""
 
251
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
252
 
 
253
    def _serialise_offsets(self, offsets):
 
254
        """Serialise a readv offset list."""
 
255
        txt = []
 
256
        for start, length in offsets:
 
257
            txt.append('%d,%d' % (start, length))
 
258
        return '\n'.join(txt)
 
259
        
 
260
 
 
261
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
262
    """Server-side encoding and decoding logic for smart version 1."""
 
263
    
 
264
    def __init__(self, backing_transport, write_func):
 
265
        self._backing_transport = backing_transport
 
266
        self.excess_buffer = ''
 
267
        self._finished = False
 
268
        self.in_buffer = ''
 
269
        self.has_dispatched = False
 
270
        self.request = None
 
271
        self._body_decoder = None
 
272
        self._write_func = write_func
 
273
 
 
274
    def accept_bytes(self, bytes):
 
275
        """Take bytes, and advance the internal state machine appropriately.
 
276
        
 
277
        :param bytes: must be a byte string
 
278
        """
 
279
        assert isinstance(bytes, str)
 
280
        self.in_buffer += bytes
 
281
        if not self.has_dispatched:
 
282
            if '\n' not in self.in_buffer:
 
283
                # no command line yet
 
284
                return
 
285
            self.has_dispatched = True
 
286
            try:
 
287
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
288
                first_line += '\n'
 
289
                req_args = _decode_tuple(first_line)
 
290
                self.request = SmartServerRequestHandler(
 
291
                    self._backing_transport)
 
292
                self.request.dispatch_command(req_args[0], req_args[1:])
 
293
                if self.request.finished_reading:
 
294
                    # trivial request
 
295
                    self.excess_buffer = self.in_buffer
 
296
                    self.in_buffer = ''
 
297
                    self._send_response(self.request.response.args,
 
298
                        self.request.response.body)
 
299
            except KeyboardInterrupt:
 
300
                raise
 
301
            except Exception, exception:
 
302
                # everything else: pass to client, flush, and quit
 
303
                self._send_response(('error', str(exception)))
 
304
                return
 
305
 
 
306
        if self.has_dispatched:
 
307
            if self._finished:
 
308
                # nothing to do.XXX: this routine should be a single state 
 
309
                # machine too.
 
310
                self.excess_buffer += self.in_buffer
 
311
                self.in_buffer = ''
 
312
                return
 
313
            if self._body_decoder is None:
 
314
                self._body_decoder = LengthPrefixedBodyDecoder()
 
315
            self._body_decoder.accept_bytes(self.in_buffer)
 
316
            self.in_buffer = self._body_decoder.unused_data
 
317
            body_data = self._body_decoder.read_pending_data()
 
318
            self.request.accept_body(body_data)
 
319
            if self._body_decoder.finished_reading:
 
320
                self.request.end_of_body()
 
321
                assert self.request.finished_reading, \
 
322
                    "no more body, request not finished"
 
323
            if self.request.response is not None:
 
324
                self._send_response(self.request.response.args,
 
325
                    self.request.response.body)
 
326
                self.excess_buffer = self.in_buffer
 
327
                self.in_buffer = ''
 
328
            else:
 
329
                assert not self.request.finished_reading, \
 
330
                    "no response and we have finished reading."
 
331
 
 
332
    def _send_response(self, args, body=None):
 
333
        """Send a smart server response down the output stream."""
 
334
        assert not self._finished, 'response already sent'
 
335
        self._finished = True
 
336
        self._write_func(_encode_tuple(args))
 
337
        if body is not None:
 
338
            assert isinstance(body, str), 'body must be a str'
 
339
            bytes = self._encode_bulk_data(body)
 
340
            self._write_func(bytes)
 
341
 
 
342
    def next_read_size(self):
 
343
        if self._finished:
 
344
            return 0
 
345
        if self._body_decoder is None:
 
346
            return 1
 
347
        else:
 
348
            return self._body_decoder.next_read_size()
 
349
 
 
350
 
 
351
class LengthPrefixedBodyDecoder(object):
 
352
    """Decodes the length-prefixed bulk data."""
 
353
    
 
354
    def __init__(self):
 
355
        self.bytes_left = None
 
356
        self.finished_reading = False
 
357
        self.unused_data = ''
 
358
        self.state_accept = self._state_accept_expecting_length
 
359
        self.state_read = self._state_read_no_data
 
360
        self._in_buffer = ''
 
361
        self._trailer_buffer = ''
 
362
    
 
363
    def accept_bytes(self, bytes):
 
364
        """Decode as much of bytes as possible.
 
365
 
 
366
        If 'bytes' contains too much data it will be appended to
 
367
        self.unused_data.
 
368
 
 
369
        finished_reading will be set when no more data is required.  Further
 
370
        data will be appended to self.unused_data.
 
371
        """
 
372
        # accept_bytes is allowed to change the state
 
373
        current_state = self.state_accept
 
374
        self.state_accept(bytes)
 
375
        while current_state != self.state_accept:
 
376
            current_state = self.state_accept
 
377
            self.state_accept('')
 
378
 
 
379
    def next_read_size(self):
 
380
        if self.bytes_left is not None:
 
381
            # Ideally we want to read all the remainder of the body and the
 
382
            # trailer in one go.
 
383
            return self.bytes_left + 5
 
384
        elif self.state_accept == self._state_accept_reading_trailer:
 
385
            # Just the trailer left
 
386
            return 5 - len(self._trailer_buffer)
 
387
        elif self.state_accept == self._state_accept_expecting_length:
 
388
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
389
            # 'done\n').
 
390
            return 6
 
391
        else:
 
392
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
393
            return 1
 
394
        
 
395
    def read_pending_data(self):
 
396
        """Return any pending data that has been decoded."""
 
397
        return self.state_read()
 
398
 
 
399
    def _state_accept_expecting_length(self, bytes):
 
400
        self._in_buffer += bytes
 
401
        pos = self._in_buffer.find('\n')
 
402
        if pos == -1:
 
403
            return
 
404
        self.bytes_left = int(self._in_buffer[:pos])
 
405
        self._in_buffer = self._in_buffer[pos+1:]
 
406
        self.bytes_left -= len(self._in_buffer)
 
407
        self.state_accept = self._state_accept_reading_body
 
408
        self.state_read = self._state_read_in_buffer
 
409
 
 
410
    def _state_accept_reading_body(self, bytes):
 
411
        self._in_buffer += bytes
 
412
        self.bytes_left -= len(bytes)
 
413
        if self.bytes_left <= 0:
 
414
            # Finished with body
 
415
            if self.bytes_left != 0:
 
416
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
417
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
418
            self.bytes_left = None
 
419
            self.state_accept = self._state_accept_reading_trailer
 
420
        
 
421
    def _state_accept_reading_trailer(self, bytes):
 
422
        self._trailer_buffer += bytes
 
423
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
424
        # a ProtocolViolation exception?
 
425
        if self._trailer_buffer.startswith('done\n'):
 
426
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
427
            self.state_accept = self._state_accept_reading_unused
 
428
            self.finished_reading = True
 
429
    
 
430
    def _state_accept_reading_unused(self, bytes):
 
431
        self.unused_data += bytes
 
432
 
 
433
    def _state_read_no_data(self):
 
434
        return ''
 
435
 
 
436
    def _state_read_in_buffer(self):
 
437
        result = self._in_buffer
 
438
        self._in_buffer = ''
 
439
        return result
 
440
 
 
441
 
 
442
class SmartServerStreamMedium(object):
 
443
    """Handles smart commands coming over a stream.
 
444
 
 
445
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
446
    in-process fifo for testing.
 
447
 
 
448
    One instance is created for each connected client; it can serve multiple
 
449
    requests in the lifetime of the connection.
 
450
 
 
451
    The server passes requests through to an underlying backing transport, 
 
452
    which will typically be a LocalTransport looking at the server's filesystem.
 
453
    """
 
454
 
 
455
    def __init__(self, backing_transport):
 
456
        """Construct new server.
 
457
 
 
458
        :param backing_transport: Transport for the directory served.
 
459
        """
 
460
        # backing_transport could be passed to serve instead of __init__
 
461
        self.backing_transport = backing_transport
 
462
        self.finished = False
 
463
 
 
464
    def serve(self):
 
465
        """Serve requests until the client disconnects."""
 
466
        # Keep a reference to stderr because the sys module's globals get set to
 
467
        # None during interpreter shutdown.
 
468
        from sys import stderr
 
469
        try:
 
470
            while not self.finished:
 
471
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
472
                                                         self._write_out)
 
473
                self._serve_one_request(protocol)
 
474
        except Exception, e:
 
475
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
476
            raise
 
477
 
 
478
    def _serve_one_request(self, protocol):
 
479
        """Read one request from input, process, send back a response.
 
480
        
 
481
        :param protocol: a SmartServerRequestProtocol.
 
482
        """
 
483
        try:
 
484
            self._serve_one_request_unguarded(protocol)
 
485
        except KeyboardInterrupt:
 
486
            raise
 
487
        except Exception, e:
 
488
            self.terminate_due_to_error()
 
489
 
 
490
    def terminate_due_to_error(self):
 
491
        """Called when an unhandled exception from the protocol occurs."""
 
492
        raise NotImplementedError(self.terminate_due_to_error)
 
493
 
 
494
 
 
495
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
496
 
 
497
    def __init__(self, sock, backing_transport):
 
498
        """Constructor.
 
499
 
 
500
        :param sock: the socket the server will read from.  It will be put
 
501
            into blocking mode.
 
502
        """
 
503
        SmartServerStreamMedium.__init__(self, backing_transport)
 
504
        self.push_back = ''
 
505
        sock.setblocking(True)
 
506
        self.socket = sock
 
507
 
 
508
    def _serve_one_request_unguarded(self, protocol):
 
509
        while protocol.next_read_size():
 
510
            if self.push_back:
 
511
                protocol.accept_bytes(self.push_back)
 
512
                self.push_back = ''
 
513
            else:
 
514
                bytes = self.socket.recv(4096)
 
515
                if bytes == '':
 
516
                    self.finished = True
 
517
                    return
 
518
                protocol.accept_bytes(bytes)
 
519
        
 
520
        self.push_back = protocol.excess_buffer
 
521
    
 
522
    def terminate_due_to_error(self):
 
523
        """Called when an unhandled exception from the protocol occurs."""
 
524
        # TODO: This should log to a server log file, but no such thing
 
525
        # exists yet.  Andrew Bennetts 2006-09-29.
 
526
        self.socket.close()
 
527
        self.finished = True
 
528
 
 
529
    def _write_out(self, bytes):
 
530
        self.socket.sendall(bytes)
 
531
 
 
532
 
 
533
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
534
 
 
535
    def __init__(self, in_file, out_file, backing_transport):
 
536
        """Construct new server.
 
537
 
 
538
        :param in_file: Python file from which requests can be read.
 
539
        :param out_file: Python file to write responses.
 
540
        :param backing_transport: Transport for the directory served.
 
541
        """
 
542
        SmartServerStreamMedium.__init__(self, backing_transport)
 
543
        self._in = in_file
 
544
        self._out = out_file
 
545
 
 
546
    def _serve_one_request_unguarded(self, protocol):
 
547
        while True:
 
548
            bytes_to_read = protocol.next_read_size()
 
549
            if bytes_to_read == 0:
 
550
                # Finished serving this request.
 
551
                self._out.flush()
 
552
                return
 
553
            bytes = self._in.read(bytes_to_read)
 
554
            if bytes == '':
 
555
                # Connection has been closed.
 
556
                self.finished = True
 
557
                self._out.flush()
 
558
                return
 
559
            protocol.accept_bytes(bytes)
 
560
 
 
561
    def terminate_due_to_error(self):
 
562
        # TODO: This should log to a server log file, but no such thing
 
563
        # exists yet.  Andrew Bennetts 2006-09-29.
 
564
        self._out.close()
 
565
        self.finished = True
 
566
 
 
567
    def _write_out(self, bytes):
 
568
        self._out.write(bytes)
 
569
 
 
570
 
 
571
class SmartServerResponse(object):
 
572
    """Response generated by SmartServerRequestHandler."""
 
573
 
 
574
    def __init__(self, args, body=None):
 
575
        self.args = args
 
576
        self.body = body
 
577
 
 
578
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
579
# for delivering the data for a request. This could be done with as the
 
580
# StreamServer, though that would create conflation between request and response
 
581
# which may be undesirable.
 
582
 
 
583
 
 
584
class SmartServerRequestHandler(object):
 
585
    """Protocol logic for smart server.
 
586
    
 
587
    This doesn't handle serialization at all, it just processes requests and
 
588
    creates responses.
 
589
    """
 
590
 
 
591
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
592
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
593
    # from the object protocol: we will want to tweak the wire protocol separate
 
594
    # from the object model, and ideally we will be able to do that without
 
595
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
596
    # just a Protocol subclass.
 
597
 
 
598
    # TODO: Better way of representing the body for commands that take it,
 
599
    # and allow it to be streamed into the server.
 
600
    
 
601
    def __init__(self, backing_transport):
 
602
        self._backing_transport = backing_transport
 
603
        self._converted_command = False
 
604
        self.finished_reading = False
 
605
        self._body_bytes = ''
 
606
        self.response = None
 
607
 
 
608
    def accept_body(self, bytes):
 
609
        """Accept body data.
 
610
 
 
611
        This should be overriden for each command that desired body data to
 
612
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
613
 
 
614
        The deserialisation into that format should be done in the Protocol
 
615
        object. Set self.desired_body_format to the format your method will
 
616
        handle.
 
617
        """
 
618
        # default fallback is to accumulate bytes.
 
619
        self._body_bytes += bytes
 
620
        
 
621
    def _end_of_body_handler(self):
 
622
        """An unimplemented end of body handler."""
 
623
        raise NotImplementedError(self._end_of_body_handler)
 
624
        
 
625
    def do_hello(self):
 
626
        """Answer a version request with my version."""
 
627
        return SmartServerResponse(('ok', '1'))
 
628
 
 
629
    def do_has(self, relpath):
 
630
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
631
        return SmartServerResponse((r,))
 
632
 
 
633
    def do_get(self, relpath):
 
634
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
635
        return SmartServerResponse(('ok',), backing_bytes)
 
636
 
 
637
    def _deserialise_optional_mode(self, mode):
 
638
        # XXX: FIXME this should be on the protocol object.
 
639
        if mode == '':
 
640
            return None
 
641
        else:
 
642
            return int(mode)
 
643
 
 
644
    def do_append(self, relpath, mode):
 
645
        self._converted_command = True
 
646
        self._relpath = relpath
 
647
        self._mode = self._deserialise_optional_mode(mode)
 
648
        self._end_of_body_handler = self._handle_do_append_end
 
649
    
 
650
    def _handle_do_append_end(self):
 
651
        old_length = self._backing_transport.append_bytes(
 
652
            self._relpath, self._body_bytes, self._mode)
 
653
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
654
 
 
655
    def do_delete(self, relpath):
 
656
        self._backing_transport.delete(relpath)
 
657
 
 
658
    def do_iter_files_recursive(self, relpath):
 
659
        transport = self._backing_transport.clone(relpath)
 
660
        filenames = transport.iter_files_recursive()
 
661
        return SmartServerResponse(('names',) + tuple(filenames))
 
662
 
 
663
    def do_list_dir(self, relpath):
 
664
        filenames = self._backing_transport.list_dir(relpath)
 
665
        return SmartServerResponse(('names',) + tuple(filenames))
 
666
 
 
667
    def do_mkdir(self, relpath, mode):
 
668
        self._backing_transport.mkdir(relpath,
 
669
                                      self._deserialise_optional_mode(mode))
 
670
 
 
671
    def do_move(self, rel_from, rel_to):
 
672
        self._backing_transport.move(rel_from, rel_to)
 
673
 
 
674
    def do_put(self, relpath, mode):
 
675
        self._converted_command = True
 
676
        self._relpath = relpath
 
677
        self._mode = self._deserialise_optional_mode(mode)
 
678
        self._end_of_body_handler = self._handle_do_put
 
679
 
 
680
    def _handle_do_put(self):
 
681
        self._backing_transport.put_bytes(self._relpath,
 
682
                self._body_bytes, self._mode)
 
683
        self.response = SmartServerResponse(('ok',))
 
684
 
 
685
    def _deserialise_offsets(self, text):
 
686
        # XXX: FIXME this should be on the protocol object.
 
687
        offsets = []
 
688
        for line in text.split('\n'):
 
689
            if not line:
 
690
                continue
 
691
            start, length = line.split(',')
 
692
            offsets.append((int(start), int(length)))
 
693
        return offsets
 
694
 
 
695
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
696
        self._converted_command = True
 
697
        self._end_of_body_handler = self._handle_put_non_atomic
 
698
        self._relpath = relpath
 
699
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
700
        self._mode = self._deserialise_optional_mode(mode)
 
701
        # a boolean would be nicer XXX
 
702
        self._create_parent = (create_parent == 'T')
 
703
 
 
704
    def _handle_put_non_atomic(self):
 
705
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
706
                self._body_bytes,
 
707
                mode=self._mode,
 
708
                create_parent_dir=self._create_parent,
 
709
                dir_mode=self._dir_mode)
 
710
        self.response = SmartServerResponse(('ok',))
 
711
 
 
712
    def do_readv(self, relpath):
 
713
        self._converted_command = True
 
714
        self._end_of_body_handler = self._handle_readv_offsets
 
715
        self._relpath = relpath
 
716
 
 
717
    def end_of_body(self):
 
718
        """No more body data will be received."""
 
719
        self._run_handler_code(self._end_of_body_handler, (), {})
 
720
        # cannot read after this.
 
721
        self.finished_reading = True
 
722
 
 
723
    def _handle_readv_offsets(self):
 
724
        """accept offsets for a readv request."""
 
725
        offsets = self._deserialise_offsets(self._body_bytes)
 
726
        backing_bytes = ''.join(bytes for offset, bytes in
 
727
            self._backing_transport.readv(self._relpath, offsets))
 
728
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
729
        
 
730
    def do_rename(self, rel_from, rel_to):
 
731
        self._backing_transport.rename(rel_from, rel_to)
 
732
 
 
733
    def do_rmdir(self, relpath):
 
734
        self._backing_transport.rmdir(relpath)
 
735
 
 
736
    def do_stat(self, relpath):
 
737
        stat = self._backing_transport.stat(relpath)
 
738
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
739
        
 
740
    def do_get_bundle(self, path, revision_id):
 
741
        # open transport relative to our base
 
742
        t = self._backing_transport.clone(path)
 
743
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
744
        repo = control.open_repository()
 
745
        tmpf = tempfile.TemporaryFile()
 
746
        base_revision = revision.NULL_REVISION
 
747
        write_bundle(repo, revision_id, base_revision, tmpf)
 
748
        tmpf.seek(0)
 
749
        return SmartServerResponse((), tmpf.read())
 
750
 
 
751
    def dispatch_command(self, cmd, args):
 
752
        """Deprecated compatibility method.""" # XXX XXX
 
753
        func = getattr(self, 'do_' + cmd, None)
 
754
        if func is None:
 
755
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
756
        self._run_handler_code(func, args, {})
 
757
 
 
758
    def _run_handler_code(self, callable, args, kwargs):
 
759
        """Run some handler specific code 'callable'.
 
760
 
 
761
        If a result is returned, it is considered to be the commands response,
 
762
        and finished_reading is set true, and its assigned to self.response.
 
763
 
 
764
        Any exceptions caught are translated and a response object created
 
765
        from them.
 
766
        """
 
767
        result = self._call_converting_errors(callable, args, kwargs)
 
768
        if result is not None:
 
769
            self.response = result
 
770
            self.finished_reading = True
 
771
        # handle unconverted commands
 
772
        if not self._converted_command:
 
773
            self.finished_reading = True
 
774
            if result is None:
 
775
                self.response = SmartServerResponse(('ok',))
 
776
 
 
777
    def _call_converting_errors(self, callable, args, kwargs):
 
778
        """Call callable converting errors to Response objects."""
 
779
        try:
 
780
            return callable(*args, **kwargs)
 
781
        except errors.NoSuchFile, e:
 
782
            return SmartServerResponse(('NoSuchFile', e.path))
 
783
        except errors.FileExists, e:
 
784
            return SmartServerResponse(('FileExists', e.path))
 
785
        except errors.DirectoryNotEmpty, e:
 
786
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
787
        except errors.ShortReadvError, e:
 
788
            return SmartServerResponse(('ShortReadvError',
 
789
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
790
        except UnicodeError, e:
 
791
            # If it is a DecodeError, than most likely we are starting
 
792
            # with a plain string
 
793
            str_or_unicode = e.object
 
794
            if isinstance(str_or_unicode, unicode):
 
795
                # XXX: UTF-8 might have \x01 (our seperator byte) in it.  We
 
796
                # should escape it somehow.
 
797
                val = 'u:' + str_or_unicode.encode('utf-8')
 
798
            else:
 
799
                val = 's:' + str_or_unicode.encode('base64')
 
800
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
801
            return SmartServerResponse((e.__class__.__name__,
 
802
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
803
        except errors.TransportNotPossible, e:
 
804
            if e.msg == "readonly transport":
 
805
                return SmartServerResponse(('ReadOnlyError', ))
 
806
            else:
 
807
                raise
 
808
 
 
809
 
 
810
class SmartTCPServer(object):
 
811
    """Listens on a TCP socket and accepts connections from smart clients"""
 
812
 
 
813
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
814
        """Construct a new server.
 
815
 
 
816
        To actually start it running, call either start_background_thread or
 
817
        serve.
 
818
 
 
819
        :param host: Name of the interface to listen on.
 
820
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
821
        """
 
822
        self._server_socket = socket.socket()
 
823
        self._server_socket.bind((host, port))
 
824
        self.port = self._server_socket.getsockname()[1]
 
825
        self._server_socket.listen(1)
 
826
        self._server_socket.settimeout(1)
 
827
        self.backing_transport = backing_transport
 
828
 
 
829
    def serve(self):
 
830
        # let connections timeout so that we get a chance to terminate
 
831
        # Keep a reference to the exceptions we want to catch because the socket
 
832
        # module's globals get set to None during interpreter shutdown.
 
833
        from socket import timeout as socket_timeout
 
834
        from socket import error as socket_error
 
835
        self._should_terminate = False
 
836
        while not self._should_terminate:
 
837
            try:
 
838
                self.accept_and_serve()
 
839
            except socket_timeout:
 
840
                # just check if we're asked to stop
 
841
                pass
 
842
            except socket_error, e:
 
843
                trace.warning("client disconnected: %s", e)
 
844
                pass
 
845
 
 
846
    def get_url(self):
 
847
        """Return the url of the server"""
 
848
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
849
 
 
850
    def accept_and_serve(self):
 
851
        conn, client_addr = self._server_socket.accept()
 
852
        # For WIN32, where the timeout value from the listening socket
 
853
        # propogates to the newly accepted socket.
 
854
        conn.setblocking(True)
 
855
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
856
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
 
857
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
858
        connection_thread.setDaemon(True)
 
859
        connection_thread.start()
 
860
 
 
861
    def start_background_thread(self):
 
862
        self._server_thread = threading.Thread(None,
 
863
                self.serve,
 
864
                name='server-' + self.get_url())
 
865
        self._server_thread.setDaemon(True)
 
866
        self._server_thread.start()
 
867
 
 
868
    def stop_background_thread(self):
 
869
        self._should_terminate = True
 
870
        # At one point we would wait to join the threads here, but it looks
 
871
        # like they don't actually exit.  So now we just leave them running
 
872
        # and expect to terminate the process. -- mbp 20070215
 
873
        # self._server_socket.close()
 
874
        ## sys.stderr.write("waiting for server thread to finish...")
 
875
        ## self._server_thread.join()
 
876
 
 
877
 
 
878
class SmartTCPServer_for_testing(SmartTCPServer):
 
879
    """Server suitable for use by transport tests.
 
880
    
 
881
    This server is backed by the process's cwd.
 
882
    """
 
883
 
 
884
    def __init__(self):
 
885
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
 
886
        # The server is set up by default like for ssh access: the client
 
887
        # passes filesystem-absolute paths; therefore the server must look
 
888
        # them up relative to the root directory.  it might be better to act
 
889
        # a public server and have the server rewrite paths into the test
 
890
        # directory.
 
891
        SmartTCPServer.__init__(self,
 
892
            transport.get_transport(urlutils.local_path_to_url('/')))
 
893
        
 
894
    def setUp(self):
 
895
        """Set up server for testing"""
 
896
        self.start_background_thread()
 
897
 
 
898
    def tearDown(self):
 
899
        self.stop_background_thread()
 
900
 
 
901
    def get_url(self):
 
902
        """Return the url of the server"""
 
903
        host, port = self._server_socket.getsockname()
 
904
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
905
 
 
906
    def get_bogus_url(self):
 
907
        """Return a URL which will fail to connect"""
 
908
        return 'bzr://127.0.0.1:1/'
 
909
 
 
910
 
 
911
class SmartStat(object):
43
912
 
44
913
    def __init__(self, size, mode):
45
914
        self.st_size = size
46
915
        self.st_mode = mode
47
916
 
48
917
 
49
 
class RemoteTransport(transport.ConnectedTransport):
 
918
class SmartTransport(transport.Transport):
50
919
    """Connection to a smart server.
51
920
 
52
 
    The connection holds references to the medium that can be used to send
53
 
    requests to the server.
 
921
    The connection holds references to pipes that can be used to send requests
 
922
    to the server.
54
923
 
55
924
    The connection has a notion of the current directory to which it's
56
925
    connected; this is incorporated in filenames passed to the server.
57
 
 
58
 
    This supports some higher-level RPC operations and can also be treated
 
926
    
 
927
    This supports some higher-level RPC operations and can also be treated 
59
928
    like a Transport to do file-like operations.
60
929
 
61
 
    The connection can be made over a tcp socket, an ssh pipe or a series of
62
 
    http requests.  There are concrete subclasses for each type:
63
 
    RemoteTCPTransport, etc.
 
930
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
931
    or a series of http requests.  There are concrete subclasses for each
 
932
    type: SmartTCPTransport, etc.
64
933
    """
65
934
 
66
 
    # When making a readv request, cap it at requesting 5MB of data
67
 
    _max_readv_bytes = 5*1024*1024
68
 
 
69
 
    # IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
 
935
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
70
936
    # responsibilities: Put those on SmartClient or similar. This is vital for
71
937
    # the ability to support multiple versions of the smart protocol over time:
72
 
    # RemoteTransport is an adapter from the Transport object model to the
 
938
    # SmartTransport is an adapter from the Transport object model to the 
73
939
    # SmartClient model, not an encoder.
74
940
 
75
 
    # FIXME: the medium parameter should be private, only the tests requires
76
 
    # it. It may be even clearer to define a TestRemoteTransport that handles
77
 
    # the specific cases of providing a _client and/or a _medium, and leave
78
 
    # RemoteTransport as an abstract class.
79
 
    def __init__(self, url, _from_transport=None, medium=None, _client=None):
 
941
    def __init__(self, url, clone_from=None, medium=None):
80
942
        """Constructor.
81
943
 
82
 
        :param _from_transport: Another RemoteTransport instance that this
83
 
            one is being cloned from.  Attributes such as the medium will
84
 
            be reused.
85
 
 
86
 
        :param medium: The medium to use for this RemoteTransport.  If None,
87
 
            the medium from the _from_transport is shared.  If both this
88
 
            and _from_transport are None, a new medium will be built.
89
 
            _from_transport and medium cannot both be specified.
90
 
 
91
 
        :param _client: Override the _SmartClient used by this transport.  This
92
 
            should only be used for testing purposes; normally this is
93
 
            determined from the medium.
94
 
        """
95
 
        super(RemoteTransport, self).__init__(
96
 
            url, _from_transport=_from_transport)
97
 
 
98
 
        # The medium is the connection, except when we need to share it with
99
 
        # other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
 
        # what we want to share is really the shared connection.
101
 
 
102
 
        if (_from_transport is not None
103
 
            and isinstance(_from_transport, RemoteTransport)):
104
 
            _client = _from_transport._client
105
 
        elif _from_transport is None:
106
 
            # If no _from_transport is specified, we need to intialize the
107
 
            # shared medium.
108
 
            credentials = None
109
 
            if medium is None:
110
 
                medium, credentials = self._build_medium()
111
 
                if 'hpss' in debug.debug_flags:
112
 
                    trace.mutter('hpss: Built a new medium: %s',
113
 
                                 medium.__class__.__name__)
114
 
            self._shared_connection = transport._SharedConnection(medium,
115
 
                                                                  credentials,
116
 
                                                                  self.base)
117
 
        elif medium is None:
118
 
            # No medium was specified, so share the medium from the
119
 
            # _from_transport.
120
 
            medium = self._shared_connection.connection
121
 
        else:
122
 
            raise AssertionError(
123
 
                "Both _from_transport (%r) and medium (%r) passed to "
124
 
                "RemoteTransport.__init__, but these parameters are mutally "
125
 
                "exclusive." % (_from_transport, medium))
126
 
 
127
 
        if _client is None:
128
 
            self._client = client._SmartClient(medium)
129
 
        else:
130
 
            self._client = _client
131
 
 
132
 
    def _build_medium(self):
133
 
        """Create the medium if _from_transport does not provide one.
134
 
 
135
 
        The medium is analogous to the connection for ConnectedTransport: it
136
 
        allows connection sharing.
137
 
        """
138
 
        # No credentials
139
 
        return None, None
140
 
 
141
 
    def _report_activity(self, bytes, direction):
142
 
        """See Transport._report_activity.
143
 
 
144
 
        Does nothing; the smart medium will report activity triggered by a
145
 
        RemoteTransport.
146
 
        """
147
 
        pass
 
944
        :param medium: The medium to use for this RemoteTransport. This must be
 
945
            supplied if clone_from is None.
 
946
        """
 
947
        ### Technically super() here is faulty because Transport's __init__
 
948
        ### fails to take 2 parameters, and if super were to choose a silly
 
949
        ### initialisation order things would blow up. 
 
950
        if not url.endswith('/'):
 
951
            url += '/'
 
952
        super(SmartTransport, self).__init__(url)
 
953
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
954
                transport.split_url(url)
 
955
        if clone_from is None:
 
956
            self._medium = medium
 
957
        else:
 
958
            # credentials may be stripped from the base in some circumstances
 
959
            # as yet to be clearly defined or documented, so copy them.
 
960
            self._username = clone_from._username
 
961
            # reuse same connection
 
962
            self._medium = clone_from._medium
 
963
        assert self._medium is not None
 
964
 
 
965
    def abspath(self, relpath):
 
966
        """Return the full url to the given relative path.
 
967
        
 
968
        @param relpath: the relative path or path components
 
969
        @type relpath: str or list
 
970
        """
 
971
        return self._unparse_url(self._remote_path(relpath))
 
972
    
 
973
    def clone(self, relative_url):
 
974
        """Make a new SmartTransport related to me, sharing the same connection.
 
975
 
 
976
        This essentially opens a handle on a different remote directory.
 
977
        """
 
978
        if relative_url is None:
 
979
            return SmartTransport(self.base, self)
 
980
        else:
 
981
            return SmartTransport(self.abspath(relative_url), self)
148
982
 
149
983
    def is_readonly(self):
150
984
        """Smart server transport can do read/write file operations."""
151
 
        try:
152
 
            resp = self._call2('Transport.is_readonly')
153
 
        except errors.UnknownSmartMethod:
154
 
            # XXX: nasty hack: servers before 0.16 don't have a
155
 
            # 'Transport.is_readonly' verb, so we do what clients before 0.16
156
 
            # did: assume False.
157
 
            return False
158
 
        if resp == ('yes', ):
159
 
            return True
160
 
        elif resp == ('no', ):
161
 
            return False
162
 
        else:
163
 
            raise errors.UnexpectedSmartServerResponse(resp)
164
 
 
 
985
        return False
 
986
                                                   
165
987
    def get_smart_client(self):
166
 
        return self._get_connection()
 
988
        return self._medium
167
989
 
168
990
    def get_smart_medium(self):
169
 
        return self._get_connection()
 
991
        return self._medium
 
992
                                                   
 
993
    def _unparse_url(self, path):
 
994
        """Return URL for a path.
 
995
 
 
996
        :see: SFTPUrlHandling._unparse_url
 
997
        """
 
998
        # TODO: Eventually it should be possible to unify this with
 
999
        # SFTPUrlHandling._unparse_url?
 
1000
        if path == '':
 
1001
            path = '/'
 
1002
        path = urllib.quote(path)
 
1003
        netloc = urllib.quote(self._host)
 
1004
        if self._username is not None:
 
1005
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
1006
        if self._port is not None:
 
1007
            netloc = '%s:%d' % (netloc, self._port)
 
1008
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
170
1009
 
171
1010
    def _remote_path(self, relpath):
172
1011
        """Returns the Unicode version of the absolute path for relpath."""
174
1013
 
175
1014
    def _call(self, method, *args):
176
1015
        resp = self._call2(method, *args)
177
 
        self._ensure_ok(resp)
 
1016
        self._translate_error(resp)
178
1017
 
179
1018
    def _call2(self, method, *args):
180
1019
        """Call a method on the remote server."""
181
 
        try:
182
 
            return self._client.call(method, *args)
183
 
        except errors.ErrorFromSmartServer, err:
184
 
            # The first argument, if present, is always a path.
185
 
            if args:
186
 
                context = {'relpath': args[0]}
187
 
            else:
188
 
                context = {}
189
 
            self._translate_error(err, **context)
 
1020
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1021
        protocol.call(method, *args)
 
1022
        return protocol.read_response_tuple()
190
1023
 
191
1024
    def _call_with_body_bytes(self, method, args, body):
192
1025
        """Call a method on the remote server with body bytes."""
193
 
        try:
194
 
            return self._client.call_with_body_bytes(method, args, body)
195
 
        except errors.ErrorFromSmartServer, err:
196
 
            # The first argument, if present, is always a path.
197
 
            if args:
198
 
                context = {'relpath': args[0]}
199
 
            else:
200
 
                context = {}
201
 
            self._translate_error(err, **context)
 
1026
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1027
        protocol.call_with_body_bytes((method, ) + args, body)
 
1028
        return protocol.read_response_tuple()
202
1029
 
203
1030
    def has(self, relpath):
204
1031
        """Indicate whether a remote file of the given name exists or not.
211
1038
        elif resp == ('no', ):
212
1039
            return False
213
1040
        else:
214
 
            raise errors.UnexpectedSmartServerResponse(resp)
 
1041
            self._translate_error(resp)
215
1042
 
216
1043
    def get(self, relpath):
217
1044
        """Return file-like object reading the contents of a remote file.
218
 
 
 
1045
        
219
1046
        :see: Transport.get_bytes()/get_file()
220
1047
        """
221
1048
        return StringIO(self.get_bytes(relpath))
222
1049
 
223
1050
    def get_bytes(self, relpath):
224
1051
        remote = self._remote_path(relpath)
225
 
        try:
226
 
            resp, response_handler = self._client.call_expecting_body('get', remote)
227
 
        except errors.ErrorFromSmartServer, err:
228
 
            self._translate_error(err, relpath)
 
1052
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1053
        protocol.call('get', remote)
 
1054
        resp = protocol.read_response_tuple(True)
229
1055
        if resp != ('ok', ):
230
 
            response_handler.cancel_read_body()
231
 
            raise errors.UnexpectedSmartServerResponse(resp)
232
 
        return response_handler.read_body_bytes()
 
1056
            protocol.cancel_read_body()
 
1057
            self._translate_error(resp, relpath)
 
1058
        return protocol.read_body_bytes()
233
1059
 
234
1060
    def _serialise_optional_mode(self, mode):
235
1061
        if mode is None:
240
1066
    def mkdir(self, relpath, mode=None):
241
1067
        resp = self._call2('mkdir', self._remote_path(relpath),
242
1068
            self._serialise_optional_mode(mode))
243
 
 
244
 
    def open_write_stream(self, relpath, mode=None):
245
 
        """See Transport.open_write_stream."""
246
 
        self.put_bytes(relpath, "", mode)
247
 
        result = transport.AppendBasedFileStream(self, relpath)
248
 
        transport._file_streams[self.abspath(relpath)] = result
249
 
        return result
 
1069
        self._translate_error(resp)
250
1070
 
251
1071
    def put_bytes(self, relpath, upload_contents, mode=None):
252
1072
        # FIXME: upload_file is probably not safe for non-ascii characters -
253
1073
        # should probably just pass all parameters as length-delimited
254
1074
        # strings?
255
 
        if type(upload_contents) is unicode:
256
 
            # Although not strictly correct, we raise UnicodeEncodeError to be
257
 
            # compatible with other transports.
258
 
            raise UnicodeEncodeError(
259
 
                'undefined', upload_contents, 0, 1,
260
 
                'put_bytes must be given bytes, not unicode.')
261
1075
        resp = self._call_with_body_bytes('put',
262
1076
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
263
1077
            upload_contents)
264
 
        self._ensure_ok(resp)
265
 
        return len(upload_contents)
 
1078
        self._translate_error(resp)
266
1079
 
267
1080
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
268
1081
                             create_parent_dir=False,
278
1091
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
279
1092
             create_parent_str, self._serialise_optional_mode(dir_mode)),
280
1093
            bytes)
281
 
        self._ensure_ok(resp)
 
1094
        self._translate_error(resp)
282
1095
 
283
1096
    def put_file(self, relpath, upload_file, mode=None):
284
1097
        # its not ideal to seek back, but currently put_non_atomic_file depends
300
1113
 
301
1114
    def append_file(self, relpath, from_file, mode=None):
302
1115
        return self.append_bytes(relpath, from_file.read(), mode)
303
 
 
 
1116
        
304
1117
    def append_bytes(self, relpath, bytes, mode=None):
305
1118
        resp = self._call_with_body_bytes(
306
1119
            'append',
308
1121
            bytes)
309
1122
        if resp[0] == 'appended':
310
1123
            return int(resp[1])
311
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
1124
        self._translate_error(resp)
312
1125
 
313
1126
    def delete(self, relpath):
314
1127
        resp = self._call2('delete', self._remote_path(relpath))
315
 
        self._ensure_ok(resp)
316
 
 
317
 
    def external_url(self):
318
 
        """See bzrlib.transport.Transport.external_url."""
319
 
        # the external path for RemoteTransports is the base
320
 
        return self.base
321
 
 
322
 
    def recommended_page_size(self):
323
 
        """Return the recommended page size for this transport."""
324
 
        return 64 * 1024
325
 
 
326
 
    def _readv(self, relpath, offsets):
 
1128
        self._translate_error(resp)
 
1129
 
 
1130
    def readv(self, relpath, offsets):
327
1131
        if not offsets:
328
1132
            return
329
1133
 
330
1134
        offsets = list(offsets)
331
1135
 
332
1136
        sorted_offsets = sorted(offsets)
 
1137
        # turn the list of offsets into a stack
 
1138
        offset_stack = iter(offsets)
 
1139
        cur_offset_and_size = offset_stack.next()
333
1140
        coalesced = list(self._coalesce_offsets(sorted_offsets,
334
1141
                               limit=self._max_readv_combine,
335
 
                               fudge_factor=self._bytes_to_read_before_seek,
336
 
                               max_size=self._max_readv_bytes))
337
 
 
338
 
        # now that we've coallesced things, avoid making enormous requests
339
 
        requests = []
340
 
        cur_request = []
341
 
        cur_len = 0
342
 
        for c in coalesced:
343
 
            if c.length + cur_len > self._max_readv_bytes:
344
 
                requests.append(cur_request)
345
 
                cur_request = [c]
346
 
                cur_len = c.length
347
 
                continue
348
 
            cur_request.append(c)
349
 
            cur_len += c.length
350
 
        if cur_request:
351
 
            requests.append(cur_request)
352
 
        if 'hpss' in debug.debug_flags:
353
 
            trace.mutter('%s.readv %s offsets => %s coalesced'
354
 
                         ' => %s requests (%s)',
355
 
                         self.__class__.__name__, len(offsets), len(coalesced),
356
 
                         len(requests), sum(map(len, requests)))
 
1142
                               fudge_factor=self._bytes_to_read_before_seek))
 
1143
 
 
1144
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1145
        protocol.call_with_body_readv_array(
 
1146
            ('readv', self._remote_path(relpath)),
 
1147
            [(c.start, c.length) for c in coalesced])
 
1148
        resp = protocol.read_response_tuple(True)
 
1149
 
 
1150
        if resp[0] != 'readv':
 
1151
            # This should raise an exception
 
1152
            protocol.cancel_read_body()
 
1153
            self._translate_error(resp)
 
1154
            return
 
1155
 
 
1156
        # FIXME: this should know how many bytes are needed, for clarity.
 
1157
        data = protocol.read_body_bytes()
357
1158
        # Cache the results, but only until they have been fulfilled
358
1159
        data_map = {}
359
 
        # turn the list of offsets into a single stack to iterate
360
 
        offset_stack = iter(offsets)
361
 
        # using a list so it can be modified when passing down and coming back
362
 
        next_offset = [offset_stack.next()]
363
 
        for cur_request in requests:
364
 
            try:
365
 
                result = self._client.call_with_body_readv_array(
366
 
                    ('readv', self._remote_path(relpath),),
367
 
                    [(c.start, c.length) for c in cur_request])
368
 
                resp, response_handler = result
369
 
            except errors.ErrorFromSmartServer, err:
370
 
                self._translate_error(err, relpath)
371
 
 
372
 
            if resp[0] != 'readv':
373
 
                # This should raise an exception
374
 
                response_handler.cancel_read_body()
375
 
                raise errors.UnexpectedSmartServerResponse(resp)
376
 
 
377
 
            for res in self._handle_response(offset_stack, cur_request,
378
 
                                             response_handler,
379
 
                                             data_map,
380
 
                                             next_offset):
381
 
                yield res
382
 
 
383
 
    def _handle_response(self, offset_stack, coalesced, response_handler,
384
 
                         data_map, next_offset):
385
 
        cur_offset_and_size = next_offset[0]
386
 
        # FIXME: this should know how many bytes are needed, for clarity.
387
 
        data = response_handler.read_body_bytes()
388
 
        data_offset = 0
389
1160
        for c_offset in coalesced:
390
1161
            if len(data) < c_offset.length:
391
1162
                raise errors.ShortReadvError(relpath, c_offset.start,
392
1163
                            c_offset.length, actual=len(data))
393
1164
            for suboffset, subsize in c_offset.ranges:
394
1165
                key = (c_offset.start+suboffset, subsize)
395
 
                this_data = data[data_offset+suboffset:
396
 
                                 data_offset+suboffset+subsize]
397
 
                # Special case when the data is in-order, rather than packing
398
 
                # into a map and then back out again. Benchmarking shows that
399
 
                # this has 100% hit rate, but leave in the data_map work just
400
 
                # in case.
401
 
                # TODO: Could we get away with using buffer() to avoid the
402
 
                #       memory copy?  Callers would need to realize they may
403
 
                #       not have a real string.
404
 
                if key == cur_offset_and_size:
405
 
                    yield cur_offset_and_size[0], this_data
406
 
                    cur_offset_and_size = next_offset[0] = offset_stack.next()
407
 
                else:
408
 
                    data_map[key] = this_data
409
 
            data_offset += c_offset.length
 
1166
                data_map[key] = data[suboffset:suboffset+subsize]
 
1167
            data = data[c_offset.length:]
410
1168
 
411
1169
            # Now that we've read some data, see if we can yield anything back
412
1170
            while cur_offset_and_size in data_map:
413
1171
                this_data = data_map.pop(cur_offset_and_size)
414
1172
                yield cur_offset_and_size[0], this_data
415
 
                cur_offset_and_size = next_offset[0] = offset_stack.next()
 
1173
                cur_offset_and_size = offset_stack.next()
416
1174
 
417
1175
    def rename(self, rel_from, rel_to):
418
1176
        self._call('rename',
427
1185
    def rmdir(self, relpath):
428
1186
        resp = self._call('rmdir', self._remote_path(relpath))
429
1187
 
430
 
    def _ensure_ok(self, resp):
431
 
        if resp[0] != 'ok':
432
 
            raise errors.UnexpectedSmartServerResponse(resp)
433
 
 
434
 
    def _translate_error(self, err, relpath=None):
435
 
        remote._translate_error(err, path=relpath)
 
1188
    def _translate_error(self, resp, orig_path=None):
 
1189
        """Raise an exception from a response"""
 
1190
        if resp is None:
 
1191
            what = None
 
1192
        else:
 
1193
            what = resp[0]
 
1194
        if what == 'ok':
 
1195
            return
 
1196
        elif what == 'NoSuchFile':
 
1197
            if orig_path is not None:
 
1198
                error_path = orig_path
 
1199
            else:
 
1200
                error_path = resp[1]
 
1201
            raise errors.NoSuchFile(error_path)
 
1202
        elif what == 'error':
 
1203
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1204
        elif what == 'FileExists':
 
1205
            raise errors.FileExists(resp[1])
 
1206
        elif what == 'DirectoryNotEmpty':
 
1207
            raise errors.DirectoryNotEmpty(resp[1])
 
1208
        elif what == 'ShortReadvError':
 
1209
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1210
                                         int(resp[3]), int(resp[4]))
 
1211
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1212
            encoding = str(resp[1]) # encoding must always be a string
 
1213
            val = resp[2]
 
1214
            start = int(resp[3])
 
1215
            end = int(resp[4])
 
1216
            reason = str(resp[5]) # reason must always be a string
 
1217
            if val.startswith('u:'):
 
1218
                val = val[2:].decode('utf-8')
 
1219
            elif val.startswith('s:'):
 
1220
                val = val[2:].decode('base64')
 
1221
            if what == 'UnicodeDecodeError':
 
1222
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1223
            elif what == 'UnicodeEncodeError':
 
1224
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1225
        elif what == "ReadOnlyError":
 
1226
            raise errors.TransportNotPossible('readonly transport')
 
1227
        else:
 
1228
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
436
1229
 
437
1230
    def disconnect(self):
438
 
        self.get_smart_medium().disconnect()
 
1231
        self._medium.disconnect()
 
1232
 
 
1233
    def delete_tree(self, relpath):
 
1234
        raise errors.TransportNotPossible('readonly transport')
439
1235
 
440
1236
    def stat(self, relpath):
441
1237
        resp = self._call2('stat', self._remote_path(relpath))
442
1238
        if resp[0] == 'stat':
443
 
            return _SmartStat(int(resp[1]), int(resp[2], 8))
444
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
1239
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
1240
        else:
 
1241
            self._translate_error(resp)
445
1242
 
446
1243
    ## def lock_read(self, relpath):
447
1244
    ##     """Lock the given file for shared (read) access.
463
1260
        resp = self._call2('list_dir', self._remote_path(relpath))
464
1261
        if resp[0] == 'names':
465
1262
            return [name.encode('ascii') for name in resp[1:]]
466
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
1263
        else:
 
1264
            self._translate_error(resp)
467
1265
 
468
1266
    def iter_files_recursive(self):
469
1267
        resp = self._call2('iter_files_recursive', self._remote_path(''))
470
1268
        if resp[0] == 'names':
471
1269
            return resp[1:]
472
 
        raise errors.UnexpectedSmartServerResponse(resp)
473
 
 
474
 
 
475
 
class RemoteTCPTransport(RemoteTransport):
 
1270
        else:
 
1271
            self._translate_error(resp)
 
1272
 
 
1273
 
 
1274
class SmartClientMediumRequest(object):
 
1275
    """A request on a SmartClientMedium.
 
1276
 
 
1277
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1278
    the response bytes to be read via read_bytes.
 
1279
 
 
1280
    For instance:
 
1281
    request.accept_bytes('123')
 
1282
    request.finished_writing()
 
1283
    result = request.read_bytes(3)
 
1284
    request.finished_reading()
 
1285
 
 
1286
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1287
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1288
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1289
    details on concurrency and pipelining.
 
1290
    """
 
1291
 
 
1292
    def __init__(self, medium):
 
1293
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1294
        self._medium = medium
 
1295
        # we track state by constants - we may want to use the same
 
1296
        # pattern as BodyReader if it gets more complex.
 
1297
        # valid states are: "writing", "reading", "done"
 
1298
        self._state = "writing"
 
1299
 
 
1300
    def accept_bytes(self, bytes):
 
1301
        """Accept bytes for inclusion in this request.
 
1302
 
 
1303
        This method may not be be called after finished_writing() has been
 
1304
        called.  It depends upon the Medium whether or not the bytes will be
 
1305
        immediately transmitted. Message based Mediums will tend to buffer the
 
1306
        bytes until finished_writing() is called.
 
1307
 
 
1308
        :param bytes: A bytestring.
 
1309
        """
 
1310
        if self._state != "writing":
 
1311
            raise errors.WritingCompleted(self)
 
1312
        self._accept_bytes(bytes)
 
1313
 
 
1314
    def _accept_bytes(self, bytes):
 
1315
        """Helper for accept_bytes.
 
1316
 
 
1317
        Accept_bytes checks the state of the request to determing if bytes
 
1318
        should be accepted. After that it hands off to _accept_bytes to do the
 
1319
        actual acceptance.
 
1320
        """
 
1321
        raise NotImplementedError(self._accept_bytes)
 
1322
 
 
1323
    def finished_reading(self):
 
1324
        """Inform the request that all desired data has been read.
 
1325
 
 
1326
        This will remove the request from the pipeline for its medium (if the
 
1327
        medium supports pipelining) and any further calls to methods on the
 
1328
        request will raise ReadingCompleted.
 
1329
        """
 
1330
        if self._state == "writing":
 
1331
            raise errors.WritingNotComplete(self)
 
1332
        if self._state != "reading":
 
1333
            raise errors.ReadingCompleted(self)
 
1334
        self._state = "done"
 
1335
        self._finished_reading()
 
1336
 
 
1337
    def _finished_reading(self):
 
1338
        """Helper for finished_reading.
 
1339
 
 
1340
        finished_reading checks the state of the request to determine if 
 
1341
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1342
        to perform the action.
 
1343
        """
 
1344
        raise NotImplementedError(self._finished_reading)
 
1345
 
 
1346
    def finished_writing(self):
 
1347
        """Finish the writing phase of this request.
 
1348
 
 
1349
        This will flush all pending data for this request along the medium.
 
1350
        After calling finished_writing, you may not call accept_bytes anymore.
 
1351
        """
 
1352
        if self._state != "writing":
 
1353
            raise errors.WritingCompleted(self)
 
1354
        self._state = "reading"
 
1355
        self._finished_writing()
 
1356
 
 
1357
    def _finished_writing(self):
 
1358
        """Helper for finished_writing.
 
1359
 
 
1360
        finished_writing checks the state of the request to determine if 
 
1361
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1362
        to perform the action.
 
1363
        """
 
1364
        raise NotImplementedError(self._finished_writing)
 
1365
 
 
1366
    def read_bytes(self, count):
 
1367
        """Read bytes from this requests response.
 
1368
 
 
1369
        This method will block and wait for count bytes to be read. It may not
 
1370
        be invoked until finished_writing() has been called - this is to ensure
 
1371
        a message-based approach to requests, for compatability with message
 
1372
        based mediums like HTTP.
 
1373
        """
 
1374
        if self._state == "writing":
 
1375
            raise errors.WritingNotComplete(self)
 
1376
        if self._state != "reading":
 
1377
            raise errors.ReadingCompleted(self)
 
1378
        return self._read_bytes(count)
 
1379
 
 
1380
    def _read_bytes(self, count):
 
1381
        """Helper for read_bytes.
 
1382
 
 
1383
        read_bytes checks the state of the request to determing if bytes
 
1384
        should be read. After that it hands off to _read_bytes to do the
 
1385
        actual read.
 
1386
        """
 
1387
        raise NotImplementedError(self._read_bytes)
 
1388
 
 
1389
 
 
1390
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1391
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1392
 
 
1393
    def __init__(self, medium):
 
1394
        SmartClientMediumRequest.__init__(self, medium)
 
1395
        # check that we are safe concurrency wise. If some streams start
 
1396
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1397
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1398
        # and the setting/unsetting of _current_request likewise moved into
 
1399
        # that class : but its unneeded overhead for now. RBC 20060922
 
1400
        if self._medium._current_request is not None:
 
1401
            raise errors.TooManyConcurrentRequests(self._medium)
 
1402
        self._medium._current_request = self
 
1403
 
 
1404
    def _accept_bytes(self, bytes):
 
1405
        """See SmartClientMediumRequest._accept_bytes.
 
1406
        
 
1407
        This forwards to self._medium._accept_bytes because we are operating
 
1408
        on the mediums stream.
 
1409
        """
 
1410
        self._medium._accept_bytes(bytes)
 
1411
 
 
1412
    def _finished_reading(self):
 
1413
        """See SmartClientMediumRequest._finished_reading.
 
1414
 
 
1415
        This clears the _current_request on self._medium to allow a new 
 
1416
        request to be created.
 
1417
        """
 
1418
        assert self._medium._current_request is self
 
1419
        self._medium._current_request = None
 
1420
        
 
1421
    def _finished_writing(self):
 
1422
        """See SmartClientMediumRequest._finished_writing.
 
1423
 
 
1424
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1425
        """
 
1426
        self._medium._flush()
 
1427
 
 
1428
    def _read_bytes(self, count):
 
1429
        """See SmartClientMediumRequest._read_bytes.
 
1430
        
 
1431
        This forwards to self._medium._read_bytes because we are operating
 
1432
        on the mediums stream.
 
1433
        """
 
1434
        return self._medium._read_bytes(count)
 
1435
 
 
1436
 
 
1437
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1438
    """The client-side protocol for smart version 1."""
 
1439
 
 
1440
    def __init__(self, request):
 
1441
        """Construct a SmartClientRequestProtocolOne.
 
1442
 
 
1443
        :param request: A SmartClientMediumRequest to serialise onto and
 
1444
            deserialise from.
 
1445
        """
 
1446
        self._request = request
 
1447
        self._body_buffer = None
 
1448
 
 
1449
    def call(self, *args):
 
1450
        bytes = _encode_tuple(args)
 
1451
        self._request.accept_bytes(bytes)
 
1452
        self._request.finished_writing()
 
1453
 
 
1454
    def call_with_body_bytes(self, args, body):
 
1455
        """Make a remote call of args with body bytes 'body'.
 
1456
 
 
1457
        After calling this, call read_response_tuple to find the result out.
 
1458
        """
 
1459
        bytes = _encode_tuple(args)
 
1460
        self._request.accept_bytes(bytes)
 
1461
        bytes = self._encode_bulk_data(body)
 
1462
        self._request.accept_bytes(bytes)
 
1463
        self._request.finished_writing()
 
1464
 
 
1465
    def call_with_body_readv_array(self, args, body):
 
1466
        """Make a remote call with a readv array.
 
1467
 
 
1468
        The body is encoded with one line per readv offset pair. The numbers in
 
1469
        each pair are separated by a comma, and no trailing \n is emitted.
 
1470
        """
 
1471
        bytes = _encode_tuple(args)
 
1472
        self._request.accept_bytes(bytes)
 
1473
        readv_bytes = self._serialise_offsets(body)
 
1474
        bytes = self._encode_bulk_data(readv_bytes)
 
1475
        self._request.accept_bytes(bytes)
 
1476
        self._request.finished_writing()
 
1477
 
 
1478
    def cancel_read_body(self):
 
1479
        """After expecting a body, a response code may indicate one otherwise.
 
1480
 
 
1481
        This method lets the domain client inform the protocol that no body
 
1482
        will be transmitted. This is a terminal method: after calling it the
 
1483
        protocol is not able to be used further.
 
1484
        """
 
1485
        self._request.finished_reading()
 
1486
 
 
1487
    def read_response_tuple(self, expect_body=False):
 
1488
        """Read a response tuple from the wire.
 
1489
 
 
1490
        This should only be called once.
 
1491
        """
 
1492
        result = self._recv_tuple()
 
1493
        if not expect_body:
 
1494
            self._request.finished_reading()
 
1495
        return result
 
1496
 
 
1497
    def read_body_bytes(self, count=-1):
 
1498
        """Read bytes from the body, decoding into a byte stream.
 
1499
        
 
1500
        We read all bytes at once to ensure we've checked the trailer for 
 
1501
        errors, and then feed the buffer back as read_body_bytes is called.
 
1502
        """
 
1503
        if self._body_buffer is not None:
 
1504
            return self._body_buffer.read(count)
 
1505
        _body_decoder = LengthPrefixedBodyDecoder()
 
1506
 
 
1507
        while not _body_decoder.finished_reading:
 
1508
            bytes_wanted = _body_decoder.next_read_size()
 
1509
            bytes = self._request.read_bytes(bytes_wanted)
 
1510
            _body_decoder.accept_bytes(bytes)
 
1511
        self._request.finished_reading()
 
1512
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1513
        # XXX: TODO check the trailer result.
 
1514
        return self._body_buffer.read(count)
 
1515
 
 
1516
    def _recv_tuple(self):
 
1517
        """Receive a tuple from the medium request."""
 
1518
        line = ''
 
1519
        while not line or line[-1] != '\n':
 
1520
            # TODO: this is inefficient - but tuples are short.
 
1521
            new_char = self._request.read_bytes(1)
 
1522
            line += new_char
 
1523
            assert new_char != '', "end of file reading from server."
 
1524
        return _decode_tuple(line)
 
1525
 
 
1526
    def query_version(self):
 
1527
        """Return protocol version number of the server."""
 
1528
        self.call('hello')
 
1529
        resp = self.read_response_tuple()
 
1530
        if resp == ('ok', '1'):
 
1531
            return 1
 
1532
        else:
 
1533
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1534
 
 
1535
 
 
1536
class SmartClientMedium(object):
 
1537
    """Smart client is a medium for sending smart protocol requests over."""
 
1538
 
 
1539
    def disconnect(self):
 
1540
        """If this medium maintains a persistent connection, close it.
 
1541
        
 
1542
        The default implementation does nothing.
 
1543
        """
 
1544
        
 
1545
 
 
1546
class SmartClientStreamMedium(SmartClientMedium):
 
1547
    """Stream based medium common class.
 
1548
 
 
1549
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1550
    SmartClientStreamMediumRequest for their requests, and should implement
 
1551
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1552
    receive bytes.
 
1553
    """
 
1554
 
 
1555
    def __init__(self):
 
1556
        self._current_request = None
 
1557
 
 
1558
    def accept_bytes(self, bytes):
 
1559
        self._accept_bytes(bytes)
 
1560
 
 
1561
    def __del__(self):
 
1562
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1563
        finished with it.
 
1564
        """
 
1565
        self.disconnect()
 
1566
 
 
1567
    def _flush(self):
 
1568
        """Flush the output stream.
 
1569
        
 
1570
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1571
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1572
        """
 
1573
        raise NotImplementedError(self._flush)
 
1574
 
 
1575
    def get_request(self):
 
1576
        """See SmartClientMedium.get_request().
 
1577
 
 
1578
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1579
        for get_request.
 
1580
        """
 
1581
        return SmartClientStreamMediumRequest(self)
 
1582
 
 
1583
    def read_bytes(self, count):
 
1584
        return self._read_bytes(count)
 
1585
 
 
1586
 
 
1587
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1588
    """A client medium using simple pipes.
 
1589
    
 
1590
    This client does not manage the pipes: it assumes they will always be open.
 
1591
    """
 
1592
 
 
1593
    def __init__(self, readable_pipe, writeable_pipe):
 
1594
        SmartClientStreamMedium.__init__(self)
 
1595
        self._readable_pipe = readable_pipe
 
1596
        self._writeable_pipe = writeable_pipe
 
1597
 
 
1598
    def _accept_bytes(self, bytes):
 
1599
        """See SmartClientStreamMedium.accept_bytes."""
 
1600
        self._writeable_pipe.write(bytes)
 
1601
 
 
1602
    def _flush(self):
 
1603
        """See SmartClientStreamMedium._flush()."""
 
1604
        self._writeable_pipe.flush()
 
1605
 
 
1606
    def _read_bytes(self, count):
 
1607
        """See SmartClientStreamMedium._read_bytes."""
 
1608
        return self._readable_pipe.read(count)
 
1609
 
 
1610
 
 
1611
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1612
    """A client medium using SSH."""
 
1613
    
 
1614
    def __init__(self, host, port=None, username=None, password=None,
 
1615
            vendor=None):
 
1616
        """Creates a client that will connect on the first use.
 
1617
        
 
1618
        :param vendor: An optional override for the ssh vendor to use. See
 
1619
            bzrlib.transport.ssh for details on ssh vendors.
 
1620
        """
 
1621
        SmartClientStreamMedium.__init__(self)
 
1622
        self._connected = False
 
1623
        self._host = host
 
1624
        self._password = password
 
1625
        self._port = port
 
1626
        self._username = username
 
1627
        self._read_from = None
 
1628
        self._ssh_connection = None
 
1629
        self._vendor = vendor
 
1630
        self._write_to = None
 
1631
 
 
1632
    def _accept_bytes(self, bytes):
 
1633
        """See SmartClientStreamMedium.accept_bytes."""
 
1634
        self._ensure_connection()
 
1635
        self._write_to.write(bytes)
 
1636
 
 
1637
    def disconnect(self):
 
1638
        """See SmartClientMedium.disconnect()."""
 
1639
        if not self._connected:
 
1640
            return
 
1641
        self._read_from.close()
 
1642
        self._write_to.close()
 
1643
        self._ssh_connection.close()
 
1644
        self._connected = False
 
1645
 
 
1646
    def _ensure_connection(self):
 
1647
        """Connect this medium if not already connected."""
 
1648
        if self._connected:
 
1649
            return
 
1650
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1651
        if self._vendor is None:
 
1652
            vendor = ssh._get_ssh_vendor()
 
1653
        else:
 
1654
            vendor = self._vendor
 
1655
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1656
                self._password, self._host, self._port,
 
1657
                command=[executable, 'serve', '--inet', '--directory=/',
 
1658
                         '--allow-writes'])
 
1659
        self._read_from, self._write_to = \
 
1660
            self._ssh_connection.get_filelike_channels()
 
1661
        self._connected = True
 
1662
 
 
1663
    def _flush(self):
 
1664
        """See SmartClientStreamMedium._flush()."""
 
1665
        self._write_to.flush()
 
1666
 
 
1667
    def _read_bytes(self, count):
 
1668
        """See SmartClientStreamMedium.read_bytes."""
 
1669
        if not self._connected:
 
1670
            raise errors.MediumNotConnected(self)
 
1671
        return self._read_from.read(count)
 
1672
 
 
1673
 
 
1674
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1675
    """A client medium using TCP."""
 
1676
    
 
1677
    def __init__(self, host, port):
 
1678
        """Creates a client that will connect on the first use."""
 
1679
        SmartClientStreamMedium.__init__(self)
 
1680
        self._connected = False
 
1681
        self._host = host
 
1682
        self._port = port
 
1683
        self._socket = None
 
1684
 
 
1685
    def _accept_bytes(self, bytes):
 
1686
        """See SmartClientMedium.accept_bytes."""
 
1687
        self._ensure_connection()
 
1688
        self._socket.sendall(bytes)
 
1689
 
 
1690
    def disconnect(self):
 
1691
        """See SmartClientMedium.disconnect()."""
 
1692
        if not self._connected:
 
1693
            return
 
1694
        self._socket.close()
 
1695
        self._socket = None
 
1696
        self._connected = False
 
1697
 
 
1698
    def _ensure_connection(self):
 
1699
        """Connect this medium if not already connected."""
 
1700
        if self._connected:
 
1701
            return
 
1702
        self._socket = socket.socket()
 
1703
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1704
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1705
        if result:
 
1706
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1707
                    (self._host, self._port, os.strerror(result)))
 
1708
        self._connected = True
 
1709
 
 
1710
    def _flush(self):
 
1711
        """See SmartClientStreamMedium._flush().
 
1712
        
 
1713
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1714
        add a means to do a flush, but that can be done in the future.
 
1715
        """
 
1716
 
 
1717
    def _read_bytes(self, count):
 
1718
        """See SmartClientMedium.read_bytes."""
 
1719
        if not self._connected:
 
1720
            raise errors.MediumNotConnected(self)
 
1721
        return self._socket.recv(count)
 
1722
 
 
1723
 
 
1724
class SmartTCPTransport(SmartTransport):
476
1725
    """Connection to smart server over plain tcp.
477
 
 
 
1726
    
478
1727
    This is essentially just a factory to get 'RemoteTransport(url,
479
1728
        SmartTCPClientMedium).
480
1729
    """
481
1730
 
482
 
    def _build_medium(self):
483
 
        client_medium = medium.SmartTCPClientMedium(
484
 
            self._host, self._port, self.base)
485
 
        return client_medium, None
486
 
 
487
 
 
488
 
class RemoteTCPTransportV2Only(RemoteTransport):
489
 
    """Connection to smart server over plain tcp with the client hard-coded to
490
 
    assume protocol v2 and remote server version <= 1.6.
491
 
 
492
 
    This should only be used for testing.
493
 
    """
494
 
 
495
 
    def _build_medium(self):
496
 
        client_medium = medium.SmartTCPClientMedium(
497
 
            self._host, self._port, self.base)
498
 
        client_medium._protocol_version = 2
499
 
        client_medium._remember_remote_is_before((1, 6))
500
 
        return client_medium, None
501
 
 
502
 
 
503
 
class RemoteSSHTransport(RemoteTransport):
 
1731
    def __init__(self, url):
 
1732
        _scheme, _username, _password, _host, _port, _path = \
 
1733
            transport.split_url(url)
 
1734
        try:
 
1735
            _port = int(_port)
 
1736
        except (ValueError, TypeError), e:
 
1737
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
 
1738
        medium = SmartTCPClientMedium(_host, _port)
 
1739
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1740
 
 
1741
 
 
1742
class SmartSSHTransport(SmartTransport):
504
1743
    """Connection to smart server over SSH.
505
1744
 
506
1745
    This is essentially just a factory to get 'RemoteTransport(url,
507
1746
        SmartSSHClientMedium).
508
1747
    """
509
1748
 
510
 
    def _build_medium(self):
511
 
        location_config = config.LocationConfig(self.base)
512
 
        bzr_remote_path = location_config.get_bzr_remote_path()
513
 
        user = self._user
514
 
        if user is None:
515
 
            auth = config.AuthenticationConfig()
516
 
            user = auth.get_user('ssh', self._host, self._port)
517
 
        client_medium = medium.SmartSSHClientMedium(self._host, self._port,
518
 
            user, self._password, self.base,
519
 
            bzr_remote_path=bzr_remote_path)
520
 
        return client_medium, (user, self._password)
521
 
 
522
 
 
523
 
class RemoteHTTPTransport(RemoteTransport):
 
1749
    def __init__(self, url):
 
1750
        _scheme, _username, _password, _host, _port, _path = \
 
1751
            transport.split_url(url)
 
1752
        try:
 
1753
            if _port is not None:
 
1754
                _port = int(_port)
 
1755
        except (ValueError, TypeError), e:
 
1756
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1757
                _port)
 
1758
        medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1759
        super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1760
 
 
1761
 
 
1762
class SmartHTTPTransport(SmartTransport):
524
1763
    """Just a way to connect between a bzr+http:// url and http://.
525
 
 
526
 
    This connection operates slightly differently than the RemoteSSHTransport.
 
1764
    
 
1765
    This connection operates slightly differently than the SmartSSHTransport.
527
1766
    It uses a plain http:// transport underneath, which defines what remote
528
1767
    .bzr/smart URL we are connected to. From there, all paths that are sent are
529
1768
    sent as relative paths, this way, the remote side can properly
531
1770
    HTTP path into a local path.
532
1771
    """
533
1772
 
534
 
    def __init__(self, base, _from_transport=None, http_transport=None):
 
1773
    def __init__(self, url, http_transport=None):
 
1774
        assert url.startswith('bzr+http://')
 
1775
 
535
1776
        if http_transport is None:
536
 
            # FIXME: the password may be lost here because it appears in the
537
 
            # url only for an intial construction (when the url came from the
538
 
            # command-line).
539
 
            http_url = base[len('bzr+'):]
 
1777
            http_url = url[len('bzr+'):]
540
1778
            self._http_transport = transport.get_transport(http_url)
541
1779
        else:
542
1780
            self._http_transport = http_transport
543
 
        super(RemoteHTTPTransport, self).__init__(
544
 
            base, _from_transport=_from_transport)
545
 
 
546
 
    def _build_medium(self):
547
 
        # We let http_transport take care of the credentials
548
 
        return self._http_transport.get_smart_medium(), None
 
1781
        http_medium = self._http_transport.get_smart_medium()
 
1782
        super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
549
1783
 
550
1784
    def _remote_path(self, relpath):
551
 
        """After connecting, HTTP Transport only deals in relative URLs."""
552
 
        # Adjust the relpath based on which URL this smart transport is
553
 
        # connected to.
554
 
        http_base = urlutils.normalize_url(self.get_smart_medium().base)
555
 
        url = urlutils.join(self.base[len('bzr+'):], relpath)
556
 
        url = urlutils.normalize_url(url)
557
 
        return urlutils.relative_url(http_base, url)
 
1785
        """After connecting HTTP Transport only deals in relative URLs."""
 
1786
        if relpath == '.':
 
1787
            return ''
 
1788
        else:
 
1789
            return relpath
 
1790
 
 
1791
    def abspath(self, relpath):
 
1792
        """Return the full url to the given relative path.
 
1793
        
 
1794
        :param relpath: the relative path or path components
 
1795
        :type relpath: str or list
 
1796
        """
 
1797
        return self._unparse_url(self._combine_paths(self._path, relpath))
558
1798
 
559
1799
    def clone(self, relative_url):
560
 
        """Make a new RemoteHTTPTransport related to me.
 
1800
        """Make a new SmartHTTPTransport related to me.
561
1801
 
562
1802
        This is re-implemented rather than using the default
563
 
        RemoteTransport.clone() because we must be careful about the underlying
 
1803
        SmartTransport.clone() because we must be careful about the underlying
564
1804
        http transport.
565
 
 
566
 
        Also, the cloned smart transport will POST to the same .bzr/smart
567
 
        location as this transport (although obviously the relative paths in the
568
 
        smart requests may be different).  This is so that the server doesn't
569
 
        have to handle .bzr/smart requests at arbitrary places inside .bzr
570
 
        directories, just at the initial URL the user uses.
571
1805
        """
572
1806
        if relative_url:
573
1807
            abs_url = self.abspath(relative_url)
574
1808
        else:
575
1809
            abs_url = self.base
576
 
        return RemoteHTTPTransport(abs_url,
577
 
                                   _from_transport=self,
578
 
                                   http_transport=self._http_transport)
579
 
 
580
 
    def _redirected_to(self, source, target):
581
 
        """See transport._redirected_to"""
582
 
        redirected = self._http_transport._redirected_to(source, target)
583
 
        if (redirected is not None
584
 
            and isinstance(redirected, type(self._http_transport))):
585
 
            return RemoteHTTPTransport('bzr+' + redirected.external_url(),
586
 
                                       http_transport=redirected)
587
 
        else:
588
 
            # Either None or a transport for a different protocol
589
 
            return redirected
590
 
 
591
 
 
592
 
class HintingSSHTransport(transport.Transport):
593
 
    """Simple transport that handles ssh:// and points out bzr+ssh://."""
594
 
 
595
 
    def __init__(self, url):
596
 
        raise errors.UnsupportedProtocol(url,
597
 
            'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
 
1810
        # By cloning the underlying http_transport, we are able to share the
 
1811
        # connection.
 
1812
        new_transport = self._http_transport.clone(relative_url)
 
1813
        return SmartHTTPTransport(abs_url, http_transport=new_transport)
598
1814
 
599
1815
 
600
1816
def get_test_permutations():
601
1817
    """Return (transport, server) permutations for testing."""
602
1818
    ### We may need a little more test framework support to construct an
603
1819
    ### appropriate RemoteTransport in the future.
604
 
    from bzrlib.tests import test_server
605
 
    return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]
 
1820
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]