1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the
51
idea that you have multiple requests and get have a read error because the
52
other side did shutdown sd send. For pipes we have read pipe which will have a
53
zero read which marks end-of-file. For HTTP server environment there is not
54
end-of-stream because each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialisation, deserialisation) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialisation, deserialisation) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
197
# the socket, and it assumes everything is UTF8 sections separated
198
# by \001. Which means a request like '\002' Will abort the connection
199
# because of a UnicodeDecodeError. It does look like invalid data will
200
# kill the SmartServerStreamMedium, but only with an abort + exception, and
201
# the overall server shouldn't die.
203
from cStringIO import StringIO
221
from bzrlib.bundle.serializer import write_bundle
222
from bzrlib.trace import mutter
223
from bzrlib.transport import local
225
# must do this otherwise urllib can't parse the urls properly :(
226
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
227
transport.register_urlparse_netloc_protocol(scheme)
231
def _recv_tuple(from_file):
232
req_line = from_file.readline()
233
return _decode_tuple(req_line)
236
def _decode_tuple(req_line):
237
if req_line == None or req_line == '':
239
if req_line[-1] != '\n':
240
raise errors.SmartProtocolError("request %r not terminated" % req_line)
241
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
244
def _encode_tuple(args):
245
"""Encode the tuple args to a bytestream."""
246
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
249
class SmartProtocolBase(object):
250
"""Methods common to client and server"""
252
# TODO: this only actually accomodates a single block; possibly should support
254
def _recv_bulk(self):
255
# This is OBSOLETE except for the double handline in the server:
256
# the read_bulk + reencode noise.
257
chunk_len = self._in.readline()
259
chunk_len = int(chunk_len)
261
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
262
bulk = self._in.read(chunk_len)
263
if len(bulk) != chunk_len:
264
raise errors.SmartProtocolError("short read fetching bulk data chunk")
268
def _recv_trailer(self):
269
resp = self._recv_tuple()
270
if resp == ('done', ):
273
self._translate_error(resp)
275
def _encode_bulk_data(self, body):
276
"""Encode body as a bulk data chunk."""
277
return ''.join(('%d\n' % len(body), body, 'done\n'))
279
def _serialise_offsets(self, offsets):
280
"""Serialise a readv offset list."""
282
for start, length in offsets:
283
txt.append('%d,%d' % (start, length))
284
return '\n'.join(txt)
286
def _send_bulk_data(self, body, a_file=None):
287
"""Send chunked body data"""
288
assert isinstance(body, str)
289
bytes = self._encode_bulk_data(body)
290
self._write_and_flush(bytes, a_file)
292
def _write_and_flush(self, bytes, a_file=None):
293
"""Write bytes to self._out and flush it."""
294
# XXX: this will be inefficient. Just ask Robert.
301
class SmartServerRequestProtocolOne(SmartProtocolBase):
302
"""Server-side encoding and decoding logic for smart version 1."""
304
def __init__(self, output_stream, backing_transport):
305
self._out_stream = output_stream
306
self._backing_transport = backing_transport
307
self.finished_reading = False
309
self.has_dispatched = False
311
self._body_decoder = None
313
def accept_bytes(self, bytes):
314
"""Take bytes, and advance the internal state machine appropriately.
316
:param bytes: must be a byte string
318
assert isinstance(bytes, str)
319
self.in_buffer += bytes
320
if not self.has_dispatched:
321
if '\n' not in self.in_buffer:
322
# no command line yet
324
self.has_dispatched = True
325
# XXX if in_buffer not \n-terminated this will do the wrong
328
assert self.in_buffer.endswith('\n')
329
req_args = _decode_tuple(self.in_buffer)
331
self.request = SmartServerRequestHandler(
332
self._backing_transport)
333
self.request.dispatch_command(req_args[0], req_args[1:])
334
if self.request.finished_reading:
336
self._send_response(self.request.response.args,
337
self.request.response.body)
338
self.sync_with_request(self.request)
340
except KeyboardInterrupt:
342
except Exception, exception:
343
# everything else: pass to client, flush, and quit
344
self._send_response(('error', str(exception)))
348
if self.finished_reading:
349
# nothing to do.XXX: this routine should be a single state
352
if self._body_decoder is None:
353
self._body_decoder = LengthPrefixedBodyDecoder()
354
self._body_decoder.accept_bytes(self.in_buffer)
355
self.in_buffer = self._body_decoder.unused_data
356
body_data = self._body_decoder.read_pending_data()
357
self.request.accept_body(body_data)
358
if self._body_decoder.finished_reading:
359
self.request.end_of_body()
360
assert self.request.finished_reading, \
361
"no more body, request not finished"
362
self.sync_with_request(self.request)
363
if self.request.response is not None:
364
self._send_response(self.request.response.args,
365
self.request.response.body)
367
assert not self.request.finished_reading, \
368
"no response and we have finished reading."
370
def _send_response(self, args, body=None):
371
"""Send a smart server response down the output stream."""
372
self._out_stream.write(_encode_tuple(args))
374
self._out_stream.flush()
376
self._send_bulk_data(body, self._out_stream)
377
#self._out_stream.write('BLARGH')
379
def sync_with_request(self, request):
380
self.finished_reading = request.finished_reading
383
class LengthPrefixedBodyDecoder(object):
384
"""Decodes the length-prefixed bulk data."""
387
self.finished_reading = False
388
self.unused_data = ''
389
self.state_accept = self._state_accept_expecting_length
390
self.state_read = self._state_read_no_data
392
self._trailer_buffer = ''
394
def accept_bytes(self, bytes):
395
"""Decode as much of bytes as possible.
397
If 'bytes' contains too much data it will be appended to
400
finished_reading will be set when no more data is required. Further
401
data will be appended to self.unused_data.
403
# accept_bytes is allowed to change the state
404
current_state = self.state_accept
405
self.state_accept(bytes)
406
while current_state != self.state_accept:
407
current_state = self.state_accept
408
self.state_accept('')
410
def read_pending_data(self):
411
"""Return any pending data that has been decoded."""
412
return self.state_read()
414
def _state_accept_expecting_length(self, bytes):
415
self._in_buffer += bytes
416
pos = self._in_buffer.find('\n')
419
self.bytes_left = int(self._in_buffer[:pos])
420
self._in_buffer = self._in_buffer[pos+1:]
421
self.bytes_left -= len(self._in_buffer)
422
self.state_accept = self._state_accept_reading_body
423
self.state_read = self._state_read_in_buffer
425
def _state_accept_reading_body(self, bytes):
426
self._in_buffer += bytes
427
self.bytes_left -= len(bytes)
428
if self.bytes_left <= 0:
430
if self.bytes_left != 0:
431
self._trailer_buffer = self._in_buffer[self.bytes_left:]
432
self._in_buffer = self._in_buffer[:self.bytes_left]
433
self.state_accept = self._state_accept_reading_trailer
435
def _state_accept_reading_trailer(self, bytes):
436
self._trailer_buffer += bytes
437
if self._trailer_buffer.startswith('done\n'):
438
self.unused_data = self._trailer_buffer[len('done\n'):]
439
self.state_accept = self._state_accept_reading_unused
440
self.finished_reading = True
442
def _state_accept_reading_unused(self, bytes):
443
self.unused_data += bytes
445
def _state_read_no_data(self):
448
def _state_read_in_buffer(self):
449
result = self._in_buffer
454
class SmartServerStreamMedium(SmartProtocolBase):
455
"""Handles smart commands coming over a stream.
457
The stream may be a pipe connected to sshd, or a tcp socket, or an
458
in-process fifo for testing.
460
One instance is created for each connected client; it can serve multiple
461
requests in the lifetime of the connection.
463
The server passes requests through to an underlying backing transport,
464
which will typically be a LocalTransport looking at the server's filesystem.
467
def __init__(self, in_file, out_file, backing_transport):
468
"""Construct new server.
470
:param in_file: Python file from which requests can be read.
471
:param out_file: Python file to write responses.
472
:param backing_transport: Transport for the directory served.
476
self.backing_transport = backing_transport
478
def _recv_tuple(self):
479
"""Read a request from the client and return as a tuple.
481
Returns None at end of file (if the client closed the connection.)
483
# ** Deserialise and read bytes
484
return _recv_tuple(self._in)
486
def _send_tuple(self, args):
487
"""Send response header"""
488
# ** serialise and write bytes
489
return self._write_and_flush(_encode_tuple(args))
491
def _send_error_and_disconnect(self, exception):
492
# ** serialise and write bytes
493
self._send_tuple(('error', str(exception)))
497
def _serve_one_request(self):
498
"""Read one request from input, process, send back a response.
500
:return: False if the server should terminate, otherwise None.
502
# ** deserialise, read bytes, serialise and write bytes
503
req_line = self._in.readline()
504
# this should just test "req_line == ''", surely? -- Andrew Bennetts
505
if req_line in ('', None):
506
# client closed connection
507
return False # shutdown server
509
protocol = SmartServerRequestProtocolOne(self._out,
510
self.backing_transport)
511
protocol.accept_bytes(req_line)
512
if not protocol.finished_reading:
513
# this boils down to readline which wont block on open sockets
514
# without data. We should really though read as much as is
515
# available and then hand to that accept_bytes without this
516
# silly double-decode.
517
bulk = self._recv_bulk()
518
bulk_bytes = ''.join(('%d\n' % len(bulk), bulk, 'done\n'))
519
protocol.accept_bytes(bulk_bytes)
520
# might be nice to do protocol.end_of_bytes()
521
# because self._recv_bulk reads all the bytes, this must finish
522
# after one delivery of data rather than looping.
523
assert protocol.finished_reading
524
except KeyboardInterrupt:
527
# everything else: pass to client, flush, and quit
528
self._send_error_and_disconnect(e)
532
"""Serve requests until the client disconnects."""
533
# Keep a reference to stderr because the sys module's globals get set to
534
# None during interpreter shutdown.
535
from sys import stderr
537
while self._serve_one_request() != False:
540
stderr.write("%s terminating on exception %s\n" % (self, e))
544
class SmartServerResponse(object):
545
"""Response generated by SmartServerRequestHandler."""
547
def __init__(self, args, body=None):
551
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
552
# for delivering the data for a request. This could be done with as the
553
# StreamServer, though that would create conflation between request and response
554
# which may be undesirable.
557
class SmartServerRequestHandler(object):
558
"""Protocol logic for smart server.
560
This doesn't handle serialization at all, it just processes requests and
564
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
565
# not contain encoding or decoding logic to allow the wire protocol to vary
566
# from the object protocol: we will want to tweak the wire protocol separate
567
# from the object model, and ideally we will be able to do that without
568
# having a SmartServerRequestHandler subclass for each wire protocol, rather
569
# just a Protocol subclass.
571
# TODO: Better way of representing the body for commands that take it,
572
# and allow it to be streamed into the server.
574
def __init__(self, backing_transport):
575
self._backing_transport = backing_transport
576
self._converted_command = False
577
self.finished_reading = False
578
self._body_bytes = ''
581
def accept_body(self, bytes):
584
This should be overriden for each command that desired body data to
585
handle the right format of that data. I.e. plain bytes, a bundle etc.
587
The deserialisation into that format should be done in the Protocol
588
object. Set self.desired_body_format to the format your method will
591
# default fallback is to accumulate bytes.
592
self._body_bytes += bytes
594
def _end_of_body_handler(self):
595
"""An unimplemented end of body handler."""
596
raise NotImplementedError(self._end_of_body_handler)
599
"""Answer a version request with my version."""
600
return SmartServerResponse(('ok', '1'))
602
def do_has(self, relpath):
603
r = self._backing_transport.has(relpath) and 'yes' or 'no'
604
return SmartServerResponse((r,))
606
def do_get(self, relpath):
607
backing_bytes = self._backing_transport.get_bytes(relpath)
608
return SmartServerResponse(('ok',), backing_bytes)
610
def _deserialise_optional_mode(self, mode):
611
# XXX: FIXME this should be on the protocol object.
617
def do_append(self, relpath, mode):
618
self._converted_command = True
619
self._relpath = relpath
620
self._mode = self._deserialise_optional_mode(mode)
621
self._end_of_body_handler = self._handle_do_append_end
623
def _handle_do_append_end(self):
624
old_length = self._backing_transport.append_bytes(
625
self._relpath, self._body_bytes, self._mode)
626
self.response = SmartServerResponse(('appended', '%d' % old_length))
628
def do_delete(self, relpath):
629
self._backing_transport.delete(relpath)
631
def do_iter_files_recursive(self, abspath):
632
# XXX: the path handling needs some thought.
633
#relpath = self._backing_transport.relpath(abspath)
634
transport = self._backing_transport.clone(abspath)
635
filenames = transport.iter_files_recursive()
636
return SmartServerResponse(('names',) + tuple(filenames))
638
def do_list_dir(self, relpath):
639
filenames = self._backing_transport.list_dir(relpath)
640
return SmartServerResponse(('names',) + tuple(filenames))
642
def do_mkdir(self, relpath, mode):
643
self._backing_transport.mkdir(relpath,
644
self._deserialise_optional_mode(mode))
646
def do_move(self, rel_from, rel_to):
647
self._backing_transport.move(rel_from, rel_to)
649
def do_put(self, relpath, mode):
650
self._converted_command = True
651
self._relpath = relpath
652
self._mode = self._deserialise_optional_mode(mode)
653
self._end_of_body_handler = self._handle_do_put
655
def _handle_do_put(self):
656
self._backing_transport.put_bytes(self._relpath,
657
self._body_bytes, self._mode)
658
self.response = SmartServerResponse(('ok',))
660
def _deserialise_offsets(self, text):
661
# XXX: FIXME this should be on the protocol object.
663
for line in text.split('\n'):
666
start, length = line.split(',')
667
offsets.append((int(start), int(length)))
670
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
671
self._converted_command = True
672
self._end_of_body_handler = self._handle_put_non_atomic
673
self._relpath = relpath
674
self._dir_mode = self._deserialise_optional_mode(dir_mode)
675
self._mode = self._deserialise_optional_mode(mode)
676
# a boolean would be nicer XXX
677
self._create_parent = (create_parent == 'T')
679
def _handle_put_non_atomic(self):
680
self._backing_transport.put_bytes_non_atomic(self._relpath,
683
create_parent_dir=self._create_parent,
684
dir_mode=self._dir_mode)
685
self.response = SmartServerResponse(('ok',))
687
def do_readv(self, relpath):
688
self._converted_command = True
689
self._end_of_body_handler = self._handle_readv_offsets
690
self._relpath = relpath
692
def end_of_body(self):
693
"""No more body data will be received."""
694
self._run_handler_code(self._end_of_body_handler, (), {})
695
# cannot read after this.
696
self.finished_reading = True
698
def _handle_readv_offsets(self):
699
"""accept offsets for a readv request."""
700
offsets = self._deserialise_offsets(self._body_bytes)
701
backing_bytes = ''.join(bytes for offset, bytes in
702
self._backing_transport.readv(self._relpath, offsets))
703
self.response = SmartServerResponse(('readv',), backing_bytes)
705
def do_rename(self, rel_from, rel_to):
706
self._backing_transport.rename(rel_from, rel_to)
708
def do_rmdir(self, relpath):
709
self._backing_transport.rmdir(relpath)
711
def do_stat(self, relpath):
712
stat = self._backing_transport.stat(relpath)
713
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
715
def do_get_bundle(self, path, revision_id):
716
# open transport relative to our base
717
t = self._backing_transport.clone(path)
718
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
719
repo = control.open_repository()
720
tmpf = tempfile.TemporaryFile()
721
base_revision = revision.NULL_REVISION
722
write_bundle(repo, revision_id, base_revision, tmpf)
724
return SmartServerResponse((), tmpf.read())
726
def dispatch_command(self, cmd, args):
727
"""Deprecated compatibility method.""" # XXX XXX
728
func = getattr(self, 'do_' + cmd, None)
730
raise errors.SmartProtocolError("bad request %r" % (cmd,))
731
self._run_handler_code(func, args, {})
733
def _run_handler_code(self, callable, args, kwargs):
734
"""Run some handler specific code 'callable'.
736
If a result is returned, it is considered to be the commands response,
737
and finished_reading is set true, and its assigned to self.response.
739
Any exceptions caught are translated and a response object created
742
result = self._call_converting_errors(callable, args, kwargs)
743
if result is not None:
744
self.response = result
745
self.finished_reading = True
746
# handle unconverted commands
747
if not self._converted_command:
748
self.finished_reading = True
750
self.response = SmartServerResponse(('ok',))
752
def _call_converting_errors(self, callable, args, kwargs):
753
"""Call callable converting errors to Response objects."""
755
return callable(*args, **kwargs)
756
except errors.NoSuchFile, e:
757
return SmartServerResponse(('NoSuchFile', e.path))
758
except errors.FileExists, e:
759
return SmartServerResponse(('FileExists', e.path))
760
except errors.DirectoryNotEmpty, e:
761
return SmartServerResponse(('DirectoryNotEmpty', e.path))
762
except errors.ShortReadvError, e:
763
return SmartServerResponse(('ShortReadvError',
764
e.path, str(e.offset), str(e.length), str(e.actual)))
765
except UnicodeError, e:
766
# If it is a DecodeError, than most likely we are starting
767
# with a plain string
768
str_or_unicode = e.object
769
if isinstance(str_or_unicode, unicode):
770
val = u'u:' + str_or_unicode
772
val = u's:' + str_or_unicode.encode('base64')
773
# This handles UnicodeEncodeError or UnicodeDecodeError
774
return SmartServerResponse((e.__class__.__name__,
775
e.encoding, val, str(e.start), str(e.end), e.reason))
776
except errors.TransportNotPossible, e:
777
if e.msg == "readonly transport":
778
return SmartServerResponse(('ReadOnlyError', ))
783
class SmartTCPServer(object):
784
"""Listens on a TCP socket and accepts connections from smart clients"""
786
def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
787
"""Construct a new server.
789
To actually start it running, call either start_background_thread or
792
:param host: Name of the interface to listen on.
793
:param port: TCP port to listen on, or 0 to allocate a transient port.
795
if backing_transport is None:
796
backing_transport = memory.MemoryTransport()
797
self._server_socket = socket.socket()
798
self._server_socket.bind((host, port))
799
self.port = self._server_socket.getsockname()[1]
800
self._server_socket.listen(1)
801
self._server_socket.settimeout(1)
802
self.backing_transport = backing_transport
805
# let connections timeout so that we get a chance to terminate
806
# Keep a reference to the exceptions we want to catch because the socket
807
# module's globals get set to None during interpreter shutdown.
808
from socket import timeout as socket_timeout
809
from socket import error as socket_error
810
self._should_terminate = False
811
while not self._should_terminate:
813
self.accept_and_serve()
814
except socket_timeout:
815
# just check if we're asked to stop
817
except socket_error, e:
818
trace.warning("client disconnected: %s", e)
822
"""Return the url of the server"""
823
return "bzr://%s:%d/" % self._server_socket.getsockname()
825
def accept_and_serve(self):
826
conn, client_addr = self._server_socket.accept()
827
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
828
from_client = conn.makefile('r')
829
to_client = conn.makefile('w')
830
handler = SmartServerStreamMedium(from_client, to_client,
831
self.backing_transport)
832
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
833
connection_thread.setDaemon(True)
834
connection_thread.start()
836
def start_background_thread(self):
837
self._server_thread = threading.Thread(None,
839
name='server-' + self.get_url())
840
self._server_thread.setDaemon(True)
841
self._server_thread.start()
843
def stop_background_thread(self):
844
self._should_terminate = True
845
# self._server_socket.close()
846
# we used to join the thread, but it's not really necessary; it will
848
## self._server_thread.join()
851
class SmartTCPServer_for_testing(SmartTCPServer):
852
"""Server suitable for use by transport tests.
854
This server is backed by the process's cwd.
858
self._homedir = os.getcwd()
859
# The server is set up by default like for ssh access: the client
860
# passes filesystem-absolute paths; therefore the server must look
861
# them up relative to the root directory. it might be better to act
862
# a public server and have the server rewrite paths into the test
864
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
867
"""Set up server for testing"""
868
self.start_background_thread()
871
self.stop_background_thread()
874
"""Return the url of the server"""
875
host, port = self._server_socket.getsockname()
876
# XXX: I think this is likely to break on windows -- self._homedir will
877
# have backslashes (and maybe a drive letter?).
878
# -- Andrew Bennetts, 2006-08-29
879
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
881
def get_bogus_url(self):
882
"""Return a URL which will fail to connect"""
883
return 'bzr://127.0.0.1:1/'
886
class SmartStat(object):
888
def __init__(self, size, mode):
893
class SmartTransport(transport.Transport):
894
"""Connection to a smart server.
896
The connection holds references to pipes that can be used to send requests
899
The connection has a notion of the current directory to which it's
900
connected; this is incorporated in filenames passed to the server.
902
This supports some higher-level RPC operations and can also be treated
903
like a Transport to do file-like operations.
905
The connection can be made over a tcp socket, or (in future) an ssh pipe
906
or a series of http requests. There are concrete subclasses for each
907
type: SmartTCPTransport, etc.
910
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
911
# responsibilities: Put those on SmartClient or similar. This is vital for
912
# the ability to support multiple versions of the smart protocol over time:
913
# SmartTransport is an adapter from the Transport object model to the
914
# SmartClient model, not an encoder.
916
def __init__(self, url, clone_from=None, medium=None):
919
:param medium: The medium to use for this RemoteTransport. This must be
920
supplied if clone_from is None.
922
### Technically super() here is faulty because Transport's __init__
923
### fails to take 2 parameters, and if super were to choose a silly
924
### initialisation order things would blow up.
925
if not url.endswith('/'):
927
super(SmartTransport, self).__init__(url)
928
self._scheme, self._username, self._password, self._host, self._port, self._path = \
929
transport.split_url(url)
930
if clone_from is None:
931
self._medium = medium
933
# credentials may be stripped from the base in some circumstances
934
# as yet to be clearly defined or documented, so copy them.
935
self._username = clone_from._username
936
# reuse same connection
937
self._medium = clone_from._medium
938
assert self._medium is not None
940
def abspath(self, relpath):
941
"""Return the full url to the given relative path.
943
@param relpath: the relative path or path components
944
@type relpath: str or list
946
return self._unparse_url(self._remote_path(relpath))
948
def clone(self, relative_url):
949
"""Make a new SmartTransport related to me, sharing the same connection.
951
This essentially opens a handle on a different remote directory.
953
if relative_url is None:
954
return SmartTransport(self.base, self)
956
return SmartTransport(self.abspath(relative_url), self)
958
def is_readonly(self):
959
"""Smart server transport can do read/write file operations."""
962
def get_smart_client(self):
965
def get_smart_medium(self):
968
def _unparse_url(self, path):
969
"""Return URL for a path.
971
:see: SFTPUrlHandling._unparse_url
973
# TODO: Eventually it should be possible to unify this with
974
# SFTPUrlHandling._unparse_url?
977
path = urllib.quote(path)
978
netloc = urllib.quote(self._host)
979
if self._username is not None:
980
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
981
if self._port is not None:
982
netloc = '%s:%d' % (netloc, self._port)
983
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
985
def _remote_path(self, relpath):
986
"""Returns the Unicode version of the absolute path for relpath."""
987
return self._combine_paths(self._path, relpath)
989
def _call(self, method, *args):
990
resp = self._call2(method, *args)
991
self._translate_error(resp)
993
def _call2(self, method, *args):
994
"""Call a method on the remote server."""
995
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
996
protocol.call(method, *args)
997
return protocol.read_response_tuple()
999
def _call_with_body_bytes(self, method, args, body):
1000
"""Call a method on the remote server with body bytes."""
1001
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1002
protocol.call_with_body_bytes((method, ) + args, body)
1003
return protocol.read_response_tuple()
1005
def has(self, relpath):
1006
"""Indicate whether a remote file of the given name exists or not.
1008
:see: Transport.has()
1010
resp = self._call2('has', self._remote_path(relpath))
1011
if resp == ('yes', ):
1013
elif resp == ('no', ):
1016
self._translate_error(resp)
1018
def get(self, relpath):
1019
"""Return file-like object reading the contents of a remote file.
1021
:see: Transport.get_bytes()/get_file()
1023
return StringIO(self.get_bytes(relpath))
1025
def get_bytes(self, relpath):
1026
remote = self._remote_path(relpath)
1027
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1028
protocol.call('get', remote)
1029
resp = protocol.read_response_tuple(True)
1030
if resp != ('ok', ):
1031
protocol.cancel_read_body()
1032
self._translate_error(resp, relpath)
1033
return protocol.read_body_bytes()
1035
def _serialise_optional_mode(self, mode):
1041
def mkdir(self, relpath, mode=None):
1042
resp = self._call2('mkdir', self._remote_path(relpath),
1043
self._serialise_optional_mode(mode))
1044
self._translate_error(resp)
1046
def put_bytes(self, relpath, upload_contents, mode=None):
1047
# FIXME: upload_file is probably not safe for non-ascii characters -
1048
# should probably just pass all parameters as length-delimited
1050
resp = self._call_with_body_bytes('put',
1051
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1053
self._translate_error(resp)
1055
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1056
create_parent_dir=False,
1058
"""See Transport.put_bytes_non_atomic."""
1059
# FIXME: no encoding in the transport!
1060
create_parent_str = 'F'
1061
if create_parent_dir:
1062
create_parent_str = 'T'
1064
resp = self._call_with_body_bytes(
1066
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1067
create_parent_str, self._serialise_optional_mode(dir_mode)),
1069
self._translate_error(resp)
1071
def put_file(self, relpath, upload_file, mode=None):
1072
# its not ideal to seek back, but currently put_non_atomic_file depends
1073
# on transports not reading before failing - which is a faulty
1074
# assumption I think - RBC 20060915
1075
pos = upload_file.tell()
1077
return self.put_bytes(relpath, upload_file.read(), mode)
1079
upload_file.seek(pos)
1082
def put_file_non_atomic(self, relpath, f, mode=None,
1083
create_parent_dir=False,
1085
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1086
create_parent_dir=create_parent_dir,
1089
def append_file(self, relpath, from_file, mode=None):
1090
return self.append_bytes(relpath, from_file.read(), mode)
1092
def append_bytes(self, relpath, bytes, mode=None):
1093
resp = self._call_with_body_bytes(
1095
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1097
if resp[0] == 'appended':
1099
self._translate_error(resp)
1101
def delete(self, relpath):
1102
resp = self._call2('delete', self._remote_path(relpath))
1103
self._translate_error(resp)
1105
def readv(self, relpath, offsets):
1109
offsets = list(offsets)
1111
sorted_offsets = sorted(offsets)
1112
# turn the list of offsets into a stack
1113
offset_stack = iter(offsets)
1114
cur_offset_and_size = offset_stack.next()
1115
coalesced = list(self._coalesce_offsets(sorted_offsets,
1116
limit=self._max_readv_combine,
1117
fudge_factor=self._bytes_to_read_before_seek))
1119
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1120
protocol.call_with_body_readv_array(
1121
('readv', self._remote_path(relpath)),
1122
[(c.start, c.length) for c in coalesced])
1123
resp = protocol.read_response_tuple(True)
1125
if resp[0] != 'readv':
1126
# This should raise an exception
1127
protocol.cancel_read_body()
1128
self._translate_error(resp)
1131
# FIXME: this should know how many bytes are needed, for clarity.
1132
data = protocol.read_body_bytes()
1133
# Cache the results, but only until they have been fulfilled
1135
for c_offset in coalesced:
1136
if len(data) < c_offset.length:
1137
raise errors.ShortReadvError(relpath, c_offset.start,
1138
c_offset.length, actual=len(data))
1139
for suboffset, subsize in c_offset.ranges:
1140
key = (c_offset.start+suboffset, subsize)
1141
data_map[key] = data[suboffset:suboffset+subsize]
1142
data = data[c_offset.length:]
1144
# Now that we've read some data, see if we can yield anything back
1145
while cur_offset_and_size in data_map:
1146
this_data = data_map.pop(cur_offset_and_size)
1147
yield cur_offset_and_size[0], this_data
1148
cur_offset_and_size = offset_stack.next()
1150
def rename(self, rel_from, rel_to):
1151
self._call('rename',
1152
self._remote_path(rel_from),
1153
self._remote_path(rel_to))
1155
def move(self, rel_from, rel_to):
1157
self._remote_path(rel_from),
1158
self._remote_path(rel_to))
1160
def rmdir(self, relpath):
1161
resp = self._call('rmdir', self._remote_path(relpath))
1163
def _translate_error(self, resp, orig_path=None):
1164
"""Raise an exception from a response"""
1171
elif what == 'NoSuchFile':
1172
if orig_path is not None:
1173
error_path = orig_path
1175
error_path = resp[1]
1176
raise errors.NoSuchFile(error_path)
1177
elif what == 'error':
1178
raise errors.SmartProtocolError(unicode(resp[1]))
1179
elif what == 'FileExists':
1180
raise errors.FileExists(resp[1])
1181
elif what == 'DirectoryNotEmpty':
1182
raise errors.DirectoryNotEmpty(resp[1])
1183
elif what == 'ShortReadvError':
1184
raise errors.ShortReadvError(resp[1], int(resp[2]),
1185
int(resp[3]), int(resp[4]))
1186
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1187
encoding = str(resp[1]) # encoding must always be a string
1189
start = int(resp[3])
1191
reason = str(resp[5]) # reason must always be a string
1192
if val.startswith('u:'):
1194
elif val.startswith('s:'):
1195
val = val[2:].decode('base64')
1196
if what == 'UnicodeDecodeError':
1197
raise UnicodeDecodeError(encoding, val, start, end, reason)
1198
elif what == 'UnicodeEncodeError':
1199
raise UnicodeEncodeError(encoding, val, start, end, reason)
1200
elif what == "ReadOnlyError":
1201
raise errors.TransportNotPossible('readonly transport')
1203
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1205
def disconnect(self):
1206
self._medium.disconnect()
1208
def delete_tree(self, relpath):
1209
raise errors.TransportNotPossible('readonly transport')
1211
def stat(self, relpath):
1212
resp = self._call2('stat', self._remote_path(relpath))
1213
if resp[0] == 'stat':
1214
return SmartStat(int(resp[1]), int(resp[2], 8))
1216
self._translate_error(resp)
1218
## def lock_read(self, relpath):
1219
## """Lock the given file for shared (read) access.
1220
## :return: A lock object, which should be passed to Transport.unlock()
1222
## # The old RemoteBranch ignore lock for reading, so we will
1223
## # continue that tradition and return a bogus lock object.
1224
## class BogusLock(object):
1225
## def __init__(self, path):
1227
## def unlock(self):
1229
## return BogusLock(relpath)
1234
def list_dir(self, relpath):
1235
resp = self._call2('list_dir', self._remote_path(relpath))
1236
if resp[0] == 'names':
1237
return [name.encode('ascii') for name in resp[1:]]
1239
self._translate_error(resp)
1241
def iter_files_recursive(self):
1242
resp = self._call2('iter_files_recursive', self._remote_path(''))
1243
if resp[0] == 'names':
1246
self._translate_error(resp)
1249
class SmartClientMediumRequest(object):
1250
"""A request on a SmartClientMedium.
1252
Each request allows bytes to be provided to it via accept_bytes, and then
1253
the response bytes to be read via read_bytes.
1256
request.accept_bytes('123')
1257
request.finished_writing()
1258
result = request.read_bytes(3)
1259
request.finished_reading()
1261
It is up to the individual SmartClientMedium whether multiple concurrent
1262
requests can exist. See SmartClientMedium.get_request to obtain instances
1263
of SmartClientMediumRequest, and the concrete Medium you are using for
1264
details on concurrency and pipelining.
1267
def __init__(self, medium):
1268
"""Construct a SmartClientMediumRequest for the medium medium."""
1269
self._medium = medium
1270
# we track state by constants - we may want to use the same
1271
# pattern as BodyReader if it gets more complex.
1272
# valid states are: "writing", "reading", "done"
1273
self._state = "writing"
1275
def accept_bytes(self, bytes):
1276
"""Accept bytes for inclusion in this request.
1278
This method may not be be called after finished_writing() has been
1279
called. It depends upon the Medium whether or not the bytes will be
1280
immediately transmitted. Message based Mediums will tend to buffer the
1281
bytes until finished_writing() is called.
1283
:param bytes: A bytestring.
1285
if self._state != "writing":
1286
raise errors.WritingCompleted(self)
1287
self._accept_bytes(bytes)
1289
def _accept_bytes(self, bytes):
1290
"""Helper for accept_bytes.
1292
Accept_bytes checks the state of the request to determing if bytes
1293
should be accepted. After that it hands off to _accept_bytes to do the
1296
raise NotImplementedError(self._accept_bytes)
1298
def finished_reading(self):
1299
"""Inform the request that all desired data has been read.
1301
This will remove the request from the pipeline for its medium (if the
1302
medium supports pipelining) and any further calls to methods on the
1303
request will raise ReadingCompleted.
1305
if self._state == "writing":
1306
raise errors.WritingNotComplete(self)
1307
if self._state != "reading":
1308
raise errors.ReadingCompleted(self)
1309
self._state = "done"
1310
self._finished_reading()
1312
def _finished_reading(self):
1313
"""Helper for finished_reading.
1315
finished_reading checks the state of the request to determine if
1316
finished_reading is allowed, and if it is hands off to _finished_reading
1317
to perform the action.
1319
raise NotImplementedError(self._finished_reading)
1321
def finished_writing(self):
1322
"""Finish the writing phase of this request.
1324
This will flush all pending data for this request along the medium.
1325
After calling finished_writing, you may not call accept_bytes anymore.
1327
if self._state != "writing":
1328
raise errors.WritingCompleted(self)
1329
self._state = "reading"
1330
self._finished_writing()
1332
def _finished_writing(self):
1333
"""Helper for finished_writing.
1335
finished_writing checks the state of the request to determine if
1336
finished_writing is allowed, and if it is hands off to _finished_writing
1337
to perform the action.
1339
raise NotImplementedError(self._finished_writing)
1341
def read_bytes(self, count):
1342
"""Read bytes from this requests response.
1344
This method will block and wait for count bytes to be read. It may not
1345
be invoked until finished_writing() has been called - this is to ensure
1346
a message-based approach to requests, for compatability with message
1347
based mediums like HTTP.
1349
if self._state == "writing":
1350
raise errors.WritingNotComplete(self)
1351
if self._state != "reading":
1352
raise errors.ReadingCompleted(self)
1353
return self._read_bytes(count)
1355
def _read_bytes(self, count):
1356
"""Helper for read_bytes.
1358
read_bytes checks the state of the request to determing if bytes
1359
should be read. After that it hands off to _read_bytes to do the
1362
raise NotImplementedError(self._read_bytes)
1365
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1366
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1368
def __init__(self, medium):
1369
SmartClientMediumRequest.__init__(self, medium)
1370
# check that we are safe concurrency wise. If some streams start
1371
# allowing concurrent requests - i.e. via multiplexing - then this
1372
# assert should be moved to SmartClientStreamMedium.get_request,
1373
# and the setting/unsetting of _current_request likewise moved into
1374
# that class : but its unneeded overhead for now. RBC 20060922
1375
if self._medium._current_request is not None:
1376
raise errors.TooManyConcurrentRequests(self._medium)
1377
self._medium._current_request = self
1379
def _accept_bytes(self, bytes):
1380
"""See SmartClientMediumRequest._accept_bytes.
1382
This forwards to self._medium._accept_bytes because we are operating
1383
on the mediums stream.
1385
self._medium._accept_bytes(bytes)
1387
def _finished_reading(self):
1388
"""See SmartClientMediumRequest._finished_reading.
1390
This clears the _current_request on self._medium to allow a new
1391
request to be created.
1393
assert self._medium._current_request is self
1394
self._medium._current_request = None
1396
def _finished_writing(self):
1397
"""See SmartClientMediumRequest._finished_writing.
1399
This invokes self._medium._flush to ensure all bytes are transmitted.
1401
self._medium._flush()
1403
def _read_bytes(self, count):
1404
"""See SmartClientMediumRequest._read_bytes.
1406
This forwards to self._medium._read_bytes because we are operating
1407
on the mediums stream.
1409
return self._medium._read_bytes(count)
1412
class SmartClientRequestProtocolOne(SmartProtocolBase):
1413
"""The client-side protocol for smart version 1."""
1415
def __init__(self, request):
1416
"""Construct a SmartClientRequestProtocolOne.
1418
:param request: A SmartClientMediumRequest to serialise onto and
1421
self._request = request
1422
self._body_buffer = None
1424
def call(self, *args):
1425
bytes = _encode_tuple(args)
1426
self._request.accept_bytes(bytes)
1427
self._request.finished_writing()
1429
def call_with_body_bytes(self, args, body):
1430
"""Make a remote call of args with body bytes 'body'.
1432
After calling this, call read_response_tuple to find the result out.
1434
bytes = _encode_tuple(args)
1435
self._request.accept_bytes(bytes)
1436
bytes = self._encode_bulk_data(body)
1437
self._request.accept_bytes(bytes)
1438
self._request.finished_writing()
1440
def call_with_body_readv_array(self, args, body):
1441
"""Make a remote call with a readv array.
1443
The body is encoded with one line per readv offset pair. The numbers in
1444
each pair are separated by a comma, and no trailing \n is emitted.
1446
bytes = _encode_tuple(args)
1447
self._request.accept_bytes(bytes)
1448
readv_bytes = self._serialise_offsets(body)
1449
bytes = self._encode_bulk_data(readv_bytes)
1450
self._request.accept_bytes(bytes)
1451
self._request.finished_writing()
1453
def cancel_read_body(self):
1454
"""After expecting a body, a response code may indicate one otherwise.
1456
This method lets the domain client inform the protocol that no body
1457
will be transmitted. This is a terminal method: after calling it the
1458
protocol is not able to be used further.
1460
self._request.finished_reading()
1462
def read_response_tuple(self, expect_body=False):
1463
"""Read a response tuple from the wire.
1465
This should only be called once.
1467
result = self._recv_tuple()
1469
self._request.finished_reading()
1472
def read_body_bytes(self, count=-1):
1473
"""Read bytes from the body, decoding into a byte stream.
1475
We read all bytes at once to ensure we've checked the trailer for
1476
errors, and then feed the buffer back as read_body_bytes is called.
1478
if self._body_buffer is not None:
1479
return self._body_buffer.read(count)
1480
_body_decoder = LengthPrefixedBodyDecoder()
1481
# grab a byte from the wire: we do this so that we dont use too much
1482
# from the wire; we should have the LengthPrefixedBodyDecoder tell
1483
# us how much is needed once the header is written.
1484
# i.e. self._body_decoder.next_read_size() would be a hint.
1485
while not _body_decoder.finished_reading:
1486
byte = self._request.read_bytes(1)
1487
_body_decoder.accept_bytes(byte)
1488
self._request.finished_reading()
1489
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1490
# XXX: TODO check the trailer result.
1491
return self._body_buffer.read(count)
1493
def _recv_tuple(self):
1494
"""Recieve a tuple from the medium request."""
1496
while not line or line[-1] != '\n':
1497
# yes, this is inefficient - but tuples are short.
1498
new_char = self._request.read_bytes(1)
1500
assert new_char != '', "end of file reading from server."
1501
return _decode_tuple(line)
1503
def query_version(self):
1504
"""Return protocol version number of the server."""
1506
resp = self.read_response_tuple()
1507
if resp == ('ok', '1'):
1510
raise errors.SmartProtocolError("bad response %r" % (resp,))
1513
class SmartClientMedium(object):
1514
"""Smart client is a medium for sending smart protocol requests over."""
1516
def disconnect(self):
1517
"""If this medium maintains a persistent connection, close it.
1519
The default implementation does nothing.
1523
class SmartClientStreamMedium(SmartClientMedium):
1524
"""Stream based medium common class.
1526
SmartClientStreamMediums operate on a stream. All subclasses use a common
1527
SmartClientStreamMediumRequest for their requests, and should implement
1528
_accept_bytes and _read_bytes to allow the request objects to send and
1533
self._current_request = None
1535
def accept_bytes(self, bytes):
1536
self._accept_bytes(bytes)
1539
"""The SmartClientStreamMedium knows how to close the stream when it is
1545
"""Flush the output stream.
1547
This method is used by the SmartClientStreamMediumRequest to ensure that
1548
all data for a request is sent, to avoid long timeouts or deadlocks.
1550
raise NotImplementedError(self._flush)
1552
def get_request(self):
1553
"""See SmartClientMedium.get_request().
1555
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1558
return SmartClientStreamMediumRequest(self)
1560
def read_bytes(self, count):
1561
return self._read_bytes(count)
1564
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1565
"""A client medium using simple pipes.
1567
This client does not manage the pipes: it assumes they will always be open.
1570
def __init__(self, readable_pipe, writeable_pipe):
1571
SmartClientStreamMedium.__init__(self)
1572
self._readable_pipe = readable_pipe
1573
self._writeable_pipe = writeable_pipe
1575
def _accept_bytes(self, bytes):
1576
"""See SmartClientStreamMedium.accept_bytes."""
1577
self._writeable_pipe.write(bytes)
1580
"""See SmartClientStreamMedium._flush()."""
1581
self._writeable_pipe.flush()
1583
def _read_bytes(self, count):
1584
"""See SmartClientStreamMedium._read_bytes."""
1585
return self._readable_pipe.read(count)
1588
class SmartSSHClientMedium(SmartClientStreamMedium):
1589
"""A client medium using SSH."""
1591
def __init__(self, host, port=None, username=None, password=None,
1593
"""Creates a client that will connect on the first use.
1595
:param vendor: An optional override for the ssh vendor to use. See
1596
bzrlib.transport.ssh for details on ssh vendors.
1598
SmartClientStreamMedium.__init__(self)
1599
self._connected = False
1601
self._password = password
1603
self._username = username
1604
self._read_from = None
1605
self._ssh_connection = None
1606
self._vendor = vendor
1607
self._write_to = None
1609
def _accept_bytes(self, bytes):
1610
"""See SmartClientStreamMedium.accept_bytes."""
1611
self._ensure_connection()
1612
self._write_to.write(bytes)
1614
def disconnect(self):
1615
"""See SmartClientMedium.disconnect()."""
1616
if not self._connected:
1618
self._read_from.close()
1619
self._write_to.close()
1620
self._ssh_connection.close()
1621
self._connected = False
1623
def _ensure_connection(self):
1624
"""Connect this medium if not already connected."""
1627
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1628
if self._vendor is None:
1629
vendor = ssh._get_ssh_vendor()
1631
vendor = self._vendor
1632
self._ssh_connection = vendor.connect_ssh(self._username,
1633
self._password, self._host, self._port,
1634
command=[executable, 'serve', '--inet', '--directory=/',
1636
self._read_from, self._write_to = \
1637
self._ssh_connection.get_filelike_channels()
1638
self._connected = True
1641
"""See SmartClientStreamMedium._flush()."""
1642
self._write_to.flush()
1644
def _read_bytes(self, count):
1645
"""See SmartClientStreamMedium.read_bytes."""
1646
if not self._connected:
1647
raise errors.MediumNotConnected(self)
1648
return self._read_from.read(count)
1651
class SmartTCPClientMedium(SmartClientStreamMedium):
1652
"""A client medium using TCP."""
1654
def __init__(self, host, port):
1655
"""Creates a client that will connect on the first use."""
1656
SmartClientStreamMedium.__init__(self)
1657
self._connected = False
1662
def _accept_bytes(self, bytes):
1663
"""See SmartClientMedium.accept_bytes."""
1664
self._ensure_connection()
1665
self._socket.sendall(bytes)
1667
def disconnect(self):
1668
"""See SmartClientMedium.disconnect()."""
1669
if not self._connected:
1671
self._socket.close()
1673
self._connected = False
1675
def _ensure_connection(self):
1676
"""Connect this medium if not already connected."""
1679
self._socket = socket.socket()
1680
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1681
result = self._socket.connect_ex((self._host, int(self._port)))
1683
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1684
(self._host, self._port, os.strerror(result)))
1685
self._connected = True
1688
"""See SmartClientStreamMedium._flush().
1690
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1691
add a means to do a flush, but that can be done in the future.
1694
def _read_bytes(self, count):
1695
"""See SmartClientMedium.read_bytes."""
1696
if not self._connected:
1697
raise errors.MediumNotConnected(self)
1698
return self._socket.recv(count)
1701
class SmartTCPTransport(SmartTransport):
1702
"""Connection to smart server over plain tcp.
1704
This is essentially just a factory to get 'RemoteTransport(url,
1705
SmartTCPClientMedium).
1708
def __init__(self, url):
1709
_scheme, _username, _password, _host, _port, _path = \
1710
transport.split_url(url)
1713
except (ValueError, TypeError), e:
1714
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1715
medium = SmartTCPClientMedium(_host, _port)
1716
super(SmartTCPTransport, self).__init__(url, medium=medium)
1720
from bzrlib.transport import sftp, ssh
1721
except errors.ParamikoNotPresent:
1722
# no paramiko, no SSHTransport.
1725
class SmartSSHTransport(SmartTransport):
1726
"""Connection to smart server over SSH.
1728
This is essentially just a factory to get 'RemoteTransport(url,
1729
SmartSSHClientMedium).
1732
def __init__(self, url):
1733
_scheme, _username, _password, _host, _port, _path = \
1734
transport.split_url(url)
1736
if _port is not None:
1738
except (ValueError, TypeError), e:
1739
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1741
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1742
super(SmartSSHTransport, self).__init__(url, medium=medium)
1745
def get_test_permutations():
1746
"""Return (transport, server) permutations for testing."""
1747
### We may need a little more test framework support to construct an
1748
### appropriate RemoteTransport in the future.
1749
return [(SmartTCPTransport, SmartTCPServer_for_testing)]