1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the
51
idea that you have multiple requests and get have a read error because the
52
other side did shutdown sd send. For pipes we have read pipe which will have a
53
zero read which marks end-of-file. For HTTP server environment there is not
54
end-of-stream because each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialisation, deserialisation) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialisation, deserialisation) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
197
# the socket, and it assumes everything is UTF8 sections separated
198
# by \001. Which means a request like '\002' Will abort the connection
199
# because of a UnicodeDecodeError. It does look like invalid data will
200
# kill the SmartServerStreamMedium, but only with an abort + exception, and
201
# the overall server shouldn't die.
203
from cStringIO import StringIO
219
from bzrlib.bundle.serializer import write_bundle
221
# must do this otherwise urllib can't parse the urls properly :(
222
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
223
transport.register_urlparse_netloc_protocol(scheme)
227
def _recv_tuple(from_file):
228
req_line = from_file.readline()
229
return _decode_tuple(req_line)
232
def _decode_tuple(req_line):
233
if req_line == None or req_line == '':
235
if req_line[-1] != '\n':
236
raise errors.SmartProtocolError("request %r not terminated" % req_line)
237
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
240
def _encode_tuple(args):
241
"""Encode the tuple args to a bytestream."""
242
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
245
class SmartProtocolBase(object):
246
"""Methods common to client and server"""
248
# TODO: this only actually accomodates a single block; possibly should support
250
def _recv_bulk(self):
251
# This is OBSOLETE except for the double handline in the server:
252
# the read_bulk + reencode noise.
253
chunk_len = self._in.readline()
255
chunk_len = int(chunk_len)
257
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
258
bulk = self._in.read(chunk_len)
259
if len(bulk) != chunk_len:
260
raise errors.SmartProtocolError("short read fetching bulk data chunk")
264
def _recv_trailer(self):
265
resp = self._recv_tuple()
266
if resp == ('done', ):
269
self._translate_error(resp)
271
def _encode_bulk_data(self, body):
272
"""Encode body as a bulk data chunk."""
273
return ''.join(('%d\n' % len(body), body, 'done\n'))
275
def _serialise_offsets(self, offsets):
276
"""Serialise a readv offset list."""
278
for start, length in offsets:
279
txt.append('%d,%d' % (start, length))
280
return '\n'.join(txt)
282
def _send_bulk_data(self, body, a_file=None):
283
"""Send chunked body data"""
284
assert isinstance(body, str)
285
bytes = self._encode_bulk_data(body)
286
self._write_and_flush(bytes, a_file)
288
def _write_and_flush(self, bytes, a_file=None):
289
"""Write bytes to self._out and flush it."""
290
# XXX: this will be inefficient. Just ask Robert.
297
class SmartServerRequestProtocolOne(SmartProtocolBase):
298
"""Server-side encoding and decoding logic for smart version 1."""
300
def __init__(self, output_stream, backing_transport):
301
self._out_stream = output_stream
302
self._backing_transport = backing_transport
303
self.finished_reading = False
305
self.has_dispatched = False
307
self._body_decoder = None
309
def accept_bytes(self, bytes):
310
"""Take bytes, and advance the internal state machine appropriately.
312
:param bytes: must be a byte string
314
assert isinstance(bytes, str)
315
self.in_buffer += bytes
316
if not self.has_dispatched:
317
if '\n' not in self.in_buffer:
318
# no command line yet
320
self.has_dispatched = True
321
# XXX if in_buffer not \n-terminated this will do the wrong
324
assert self.in_buffer.endswith('\n')
325
req_args = _decode_tuple(self.in_buffer)
327
self.request = SmartServerRequestHandler(
328
self._backing_transport)
329
self.request.dispatch_command(req_args[0], req_args[1:])
330
if self.request.finished_reading:
332
self._send_response(self.request.response.args,
333
self.request.response.body)
334
self.sync_with_request(self.request)
336
except KeyboardInterrupt:
338
except Exception, exception:
339
# everything else: pass to client, flush, and quit
340
self._send_response(('error', str(exception)))
344
if self.finished_reading:
345
# nothing to do.XXX: this routine should be a single state
348
if self._body_decoder is None:
349
self._body_decoder = LengthPrefixedBodyDecoder()
350
self._body_decoder.accept_bytes(self.in_buffer)
351
self.in_buffer = self._body_decoder.unused_data
352
body_data = self._body_decoder.read_pending_data()
353
self.request.accept_body(body_data)
354
if self._body_decoder.finished_reading:
355
self.request.end_of_body()
356
assert self.request.finished_reading, \
357
"no more body, request not finished"
358
self.sync_with_request(self.request)
359
if self.request.response is not None:
360
self._send_response(self.request.response.args,
361
self.request.response.body)
363
assert not self.request.finished_reading, \
364
"no response and we have finished reading."
366
def _send_response(self, args, body=None):
367
"""Send a smart server response down the output stream."""
368
self._out_stream.write(_encode_tuple(args))
370
self._out_stream.flush()
372
self._send_bulk_data(body, self._out_stream)
373
#self._out_stream.write('BLARGH')
375
def sync_with_request(self, request):
376
self.finished_reading = request.finished_reading
379
class LengthPrefixedBodyDecoder(object):
380
"""Decodes the length-prefixed bulk data."""
383
self.finished_reading = False
384
self.unused_data = ''
385
self.state_accept = self._state_accept_expecting_length
386
self.state_read = self._state_read_no_data
388
self._trailer_buffer = ''
390
def accept_bytes(self, bytes):
391
"""Decode as much of bytes as possible.
393
If 'bytes' contains too much data it will be appended to
396
finished_reading will be set when no more data is required. Further
397
data will be appended to self.unused_data.
399
# accept_bytes is allowed to change the state
400
current_state = self.state_accept
401
self.state_accept(bytes)
402
while current_state != self.state_accept:
403
current_state = self.state_accept
404
self.state_accept('')
406
def read_pending_data(self):
407
"""Return any pending data that has been decoded."""
408
return self.state_read()
410
def _state_accept_expecting_length(self, bytes):
411
self._in_buffer += bytes
412
pos = self._in_buffer.find('\n')
415
self.bytes_left = int(self._in_buffer[:pos])
416
self._in_buffer = self._in_buffer[pos+1:]
417
self.bytes_left -= len(self._in_buffer)
418
self.state_accept = self._state_accept_reading_body
419
self.state_read = self._state_read_in_buffer
421
def _state_accept_reading_body(self, bytes):
422
self._in_buffer += bytes
423
self.bytes_left -= len(bytes)
424
if self.bytes_left <= 0:
426
if self.bytes_left != 0:
427
self._trailer_buffer = self._in_buffer[self.bytes_left:]
428
self._in_buffer = self._in_buffer[:self.bytes_left]
429
self.state_accept = self._state_accept_reading_trailer
431
def _state_accept_reading_trailer(self, bytes):
432
self._trailer_buffer += bytes
433
if self._trailer_buffer.startswith('done\n'):
434
self.unused_data = self._trailer_buffer[len('done\n'):]
435
self.state_accept = self._state_accept_reading_unused
436
self.finished_reading = True
438
def _state_accept_reading_unused(self, bytes):
439
self.unused_data += bytes
441
def _state_read_no_data(self):
444
def _state_read_in_buffer(self):
445
result = self._in_buffer
450
class SmartServerStreamMedium(SmartProtocolBase):
451
"""Handles smart commands coming over a stream.
453
The stream may be a pipe connected to sshd, or a tcp socket, or an
454
in-process fifo for testing.
456
One instance is created for each connected client; it can serve multiple
457
requests in the lifetime of the connection.
459
The server passes requests through to an underlying backing transport,
460
which will typically be a LocalTransport looking at the server's filesystem.
463
def __init__(self, in_file, out_file, backing_transport):
464
"""Construct new server.
466
:param in_file: Python file from which requests can be read.
467
:param out_file: Python file to write responses.
468
:param backing_transport: Transport for the directory served.
472
self.backing_transport = backing_transport
474
def _recv_tuple(self):
475
"""Read a request from the client and return as a tuple.
477
Returns None at end of file (if the client closed the connection.)
479
# ** Deserialise and read bytes
480
return _recv_tuple(self._in)
482
def _send_tuple(self, args):
483
"""Send response header"""
484
# ** serialise and write bytes
485
return self._write_and_flush(_encode_tuple(args))
487
def _send_error_and_disconnect(self, exception):
488
# ** serialise and write bytes
489
self._send_tuple(('error', str(exception)))
493
def _serve_one_request(self):
494
"""Read one request from input, process, send back a response.
496
:return: False if the server should terminate, otherwise None.
498
# ** deserialise, read bytes, serialise and write bytes
499
req_line = self._in.readline()
500
# this should just test "req_line == ''", surely? -- Andrew Bennetts
501
if req_line in ('', None):
502
# client closed connection
503
return False # shutdown server
505
protocol = SmartServerRequestProtocolOne(self._out,
506
self.backing_transport)
507
protocol.accept_bytes(req_line)
508
if not protocol.finished_reading:
509
# this boils down to readline which wont block on open sockets
510
# without data. We should really though read as much as is
511
# available and then hand to that accept_bytes without this
512
# silly double-decode.
513
bulk = self._recv_bulk()
514
bulk_bytes = ''.join(('%d\n' % len(bulk), bulk, 'done\n'))
515
protocol.accept_bytes(bulk_bytes)
516
# might be nice to do protocol.end_of_bytes()
517
# because self._recv_bulk reads all the bytes, this must finish
518
# after one delivery of data rather than looping.
519
assert protocol.finished_reading
520
except KeyboardInterrupt:
523
# everything else: pass to client, flush, and quit
524
self._send_error_and_disconnect(e)
528
"""Serve requests until the client disconnects."""
529
# Keep a reference to stderr because the sys module's globals get set to
530
# None during interpreter shutdown.
531
from sys import stderr
533
while self._serve_one_request() != False:
536
stderr.write("%s terminating on exception %s\n" % (self, e))
540
class SmartServerResponse(object):
541
"""Response generated by SmartServerRequestHandler."""
543
def __init__(self, args, body=None):
547
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
548
# for delivering the data for a request. This could be done with as the
549
# StreamServer, though that would create conflation between request and response
550
# which may be undesirable.
553
class SmartServerRequestHandler(object):
554
"""Protocol logic for smart server.
556
This doesn't handle serialization at all, it just processes requests and
560
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
561
# not contain encoding or decoding logic to allow the wire protocol to vary
562
# from the object protocol: we will want to tweak the wire protocol separate
563
# from the object model, and ideally we will be able to do that without
564
# having a SmartServerRequestHandler subclass for each wire protocol, rather
565
# just a Protocol subclass.
567
# TODO: Better way of representing the body for commands that take it,
568
# and allow it to be streamed into the server.
570
def __init__(self, backing_transport):
571
self._backing_transport = backing_transport
572
self._converted_command = False
573
self.finished_reading = False
574
self._body_bytes = ''
577
def accept_body(self, bytes):
580
This should be overriden for each command that desired body data to
581
handle the right format of that data. I.e. plain bytes, a bundle etc.
583
The deserialisation into that format should be done in the Protocol
584
object. Set self.desired_body_format to the format your method will
587
# default fallback is to accumulate bytes.
588
self._body_bytes += bytes
590
def _end_of_body_handler(self):
591
"""An unimplemented end of body handler."""
592
raise NotImplementedError(self._end_of_body_handler)
595
"""Answer a version request with my version."""
596
return SmartServerResponse(('ok', '1'))
598
def do_has(self, relpath):
599
r = self._backing_transport.has(relpath) and 'yes' or 'no'
600
return SmartServerResponse((r,))
602
def do_get(self, relpath):
603
backing_bytes = self._backing_transport.get_bytes(relpath)
604
return SmartServerResponse(('ok',), backing_bytes)
606
def _deserialise_optional_mode(self, mode):
607
# XXX: FIXME this should be on the protocol object.
613
def do_append(self, relpath, mode):
614
self._converted_command = True
615
self._relpath = relpath
616
self._mode = self._deserialise_optional_mode(mode)
617
self._end_of_body_handler = self._handle_do_append_end
619
def _handle_do_append_end(self):
620
old_length = self._backing_transport.append_bytes(
621
self._relpath, self._body_bytes, self._mode)
622
self.response = SmartServerResponse(('appended', '%d' % old_length))
624
def do_delete(self, relpath):
625
self._backing_transport.delete(relpath)
627
def do_iter_files_recursive(self, abspath):
628
# XXX: the path handling needs some thought.
629
#relpath = self._backing_transport.relpath(abspath)
630
transport = self._backing_transport.clone(abspath)
631
filenames = transport.iter_files_recursive()
632
return SmartServerResponse(('names',) + tuple(filenames))
634
def do_list_dir(self, relpath):
635
filenames = self._backing_transport.list_dir(relpath)
636
return SmartServerResponse(('names',) + tuple(filenames))
638
def do_mkdir(self, relpath, mode):
639
self._backing_transport.mkdir(relpath,
640
self._deserialise_optional_mode(mode))
642
def do_move(self, rel_from, rel_to):
643
self._backing_transport.move(rel_from, rel_to)
645
def do_put(self, relpath, mode):
646
self._converted_command = True
647
self._relpath = relpath
648
self._mode = self._deserialise_optional_mode(mode)
649
self._end_of_body_handler = self._handle_do_put
651
def _handle_do_put(self):
652
self._backing_transport.put_bytes(self._relpath,
653
self._body_bytes, self._mode)
654
self.response = SmartServerResponse(('ok',))
656
def _deserialise_offsets(self, text):
657
# XXX: FIXME this should be on the protocol object.
659
for line in text.split('\n'):
662
start, length = line.split(',')
663
offsets.append((int(start), int(length)))
666
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
667
self._converted_command = True
668
self._end_of_body_handler = self._handle_put_non_atomic
669
self._relpath = relpath
670
self._dir_mode = self._deserialise_optional_mode(dir_mode)
671
self._mode = self._deserialise_optional_mode(mode)
672
# a boolean would be nicer XXX
673
self._create_parent = (create_parent == 'T')
675
def _handle_put_non_atomic(self):
676
self._backing_transport.put_bytes_non_atomic(self._relpath,
679
create_parent_dir=self._create_parent,
680
dir_mode=self._dir_mode)
681
self.response = SmartServerResponse(('ok',))
683
def do_readv(self, relpath):
684
self._converted_command = True
685
self._end_of_body_handler = self._handle_readv_offsets
686
self._relpath = relpath
688
def end_of_body(self):
689
"""No more body data will be received."""
690
self._run_handler_code(self._end_of_body_handler, (), {})
691
# cannot read after this.
692
self.finished_reading = True
694
def _handle_readv_offsets(self):
695
"""accept offsets for a readv request."""
696
offsets = self._deserialise_offsets(self._body_bytes)
697
backing_bytes = ''.join(bytes for offset, bytes in
698
self._backing_transport.readv(self._relpath, offsets))
699
self.response = SmartServerResponse(('readv',), backing_bytes)
701
def do_rename(self, rel_from, rel_to):
702
self._backing_transport.rename(rel_from, rel_to)
704
def do_rmdir(self, relpath):
705
self._backing_transport.rmdir(relpath)
707
def do_stat(self, relpath):
708
stat = self._backing_transport.stat(relpath)
709
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
711
def do_get_bundle(self, path, revision_id):
712
# open transport relative to our base
713
t = self._backing_transport.clone(path)
714
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
715
repo = control.open_repository()
716
tmpf = tempfile.TemporaryFile()
717
base_revision = revision.NULL_REVISION
718
write_bundle(repo, revision_id, base_revision, tmpf)
720
return SmartServerResponse((), tmpf.read())
722
def dispatch_command(self, cmd, args):
723
"""Deprecated compatibility method.""" # XXX XXX
724
func = getattr(self, 'do_' + cmd, None)
726
raise errors.SmartProtocolError("bad request %r" % (cmd,))
727
self._run_handler_code(func, args, {})
729
def _run_handler_code(self, callable, args, kwargs):
730
"""Run some handler specific code 'callable'.
732
If a result is returned, it is considered to be the commands response,
733
and finished_reading is set true, and its assigned to self.response.
735
Any exceptions caught are translated and a response object created
738
result = self._call_converting_errors(callable, args, kwargs)
739
if result is not None:
740
self.response = result
741
self.finished_reading = True
742
# handle unconverted commands
743
if not self._converted_command:
744
self.finished_reading = True
746
self.response = SmartServerResponse(('ok',))
748
def _call_converting_errors(self, callable, args, kwargs):
749
"""Call callable converting errors to Response objects."""
751
return callable(*args, **kwargs)
752
except errors.NoSuchFile, e:
753
return SmartServerResponse(('NoSuchFile', e.path))
754
except errors.FileExists, e:
755
return SmartServerResponse(('FileExists', e.path))
756
except errors.DirectoryNotEmpty, e:
757
return SmartServerResponse(('DirectoryNotEmpty', e.path))
758
except errors.ShortReadvError, e:
759
return SmartServerResponse(('ShortReadvError',
760
e.path, str(e.offset), str(e.length), str(e.actual)))
761
except UnicodeError, e:
762
# If it is a DecodeError, than most likely we are starting
763
# with a plain string
764
str_or_unicode = e.object
765
if isinstance(str_or_unicode, unicode):
766
val = u'u:' + str_or_unicode
768
val = u's:' + str_or_unicode.encode('base64')
769
# This handles UnicodeEncodeError or UnicodeDecodeError
770
return SmartServerResponse((e.__class__.__name__,
771
e.encoding, val, str(e.start), str(e.end), e.reason))
772
except errors.TransportNotPossible, e:
773
if e.msg == "readonly transport":
774
return SmartServerResponse(('ReadOnlyError', ))
779
class SmartTCPServer(object):
780
"""Listens on a TCP socket and accepts connections from smart clients"""
782
def __init__(self, backing_transport, host='127.0.0.1', port=0):
783
"""Construct a new server.
785
To actually start it running, call either start_background_thread or
788
:param host: Name of the interface to listen on.
789
:param port: TCP port to listen on, or 0 to allocate a transient port.
791
self._server_socket = socket.socket()
792
self._server_socket.bind((host, port))
793
self.port = self._server_socket.getsockname()[1]
794
self._server_socket.listen(1)
795
self._server_socket.settimeout(1)
796
self.backing_transport = backing_transport
799
# let connections timeout so that we get a chance to terminate
800
# Keep a reference to the exceptions we want to catch because the socket
801
# module's globals get set to None during interpreter shutdown.
802
from socket import timeout as socket_timeout
803
from socket import error as socket_error
804
self._should_terminate = False
805
while not self._should_terminate:
807
self.accept_and_serve()
808
except socket_timeout:
809
# just check if we're asked to stop
811
except socket_error, e:
812
trace.warning("client disconnected: %s", e)
816
"""Return the url of the server"""
817
return "bzr://%s:%d/" % self._server_socket.getsockname()
819
def accept_and_serve(self):
820
conn, client_addr = self._server_socket.accept()
821
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
822
from_client = conn.makefile('r')
823
to_client = conn.makefile('w')
824
handler = SmartServerStreamMedium(from_client, to_client,
825
self.backing_transport)
826
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
827
connection_thread.setDaemon(True)
828
connection_thread.start()
830
def start_background_thread(self):
831
self._server_thread = threading.Thread(None,
833
name='server-' + self.get_url())
834
self._server_thread.setDaemon(True)
835
self._server_thread.start()
837
def stop_background_thread(self):
838
self._should_terminate = True
839
# self._server_socket.close()
840
# we used to join the thread, but it's not really necessary; it will
842
## self._server_thread.join()
845
class SmartTCPServer_for_testing(SmartTCPServer):
846
"""Server suitable for use by transport tests.
848
This server is backed by the process's cwd.
852
self._homedir = os.getcwd()
853
# The server is set up by default like for ssh access: the client
854
# passes filesystem-absolute paths; therefore the server must look
855
# them up relative to the root directory. it might be better to act
856
# a public server and have the server rewrite paths into the test
858
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
861
"""Set up server for testing"""
862
self.start_background_thread()
865
self.stop_background_thread()
868
"""Return the url of the server"""
869
host, port = self._server_socket.getsockname()
870
# XXX: I think this is likely to break on windows -- self._homedir will
871
# have backslashes (and maybe a drive letter?).
872
# -- Andrew Bennetts, 2006-08-29
873
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
875
def get_bogus_url(self):
876
"""Return a URL which will fail to connect"""
877
return 'bzr://127.0.0.1:1/'
880
class SmartStat(object):
882
def __init__(self, size, mode):
887
class SmartTransport(transport.Transport):
888
"""Connection to a smart server.
890
The connection holds references to pipes that can be used to send requests
893
The connection has a notion of the current directory to which it's
894
connected; this is incorporated in filenames passed to the server.
896
This supports some higher-level RPC operations and can also be treated
897
like a Transport to do file-like operations.
899
The connection can be made over a tcp socket, or (in future) an ssh pipe
900
or a series of http requests. There are concrete subclasses for each
901
type: SmartTCPTransport, etc.
904
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
905
# responsibilities: Put those on SmartClient or similar. This is vital for
906
# the ability to support multiple versions of the smart protocol over time:
907
# SmartTransport is an adapter from the Transport object model to the
908
# SmartClient model, not an encoder.
910
def __init__(self, url, clone_from=None, medium=None):
913
:param medium: The medium to use for this RemoteTransport. This must be
914
supplied if clone_from is None.
916
### Technically super() here is faulty because Transport's __init__
917
### fails to take 2 parameters, and if super were to choose a silly
918
### initialisation order things would blow up.
919
if not url.endswith('/'):
921
super(SmartTransport, self).__init__(url)
922
self._scheme, self._username, self._password, self._host, self._port, self._path = \
923
transport.split_url(url)
924
if clone_from is None:
925
self._medium = medium
927
# credentials may be stripped from the base in some circumstances
928
# as yet to be clearly defined or documented, so copy them.
929
self._username = clone_from._username
930
# reuse same connection
931
self._medium = clone_from._medium
932
assert self._medium is not None
934
def abspath(self, relpath):
935
"""Return the full url to the given relative path.
937
@param relpath: the relative path or path components
938
@type relpath: str or list
940
return self._unparse_url(self._remote_path(relpath))
942
def clone(self, relative_url):
943
"""Make a new SmartTransport related to me, sharing the same connection.
945
This essentially opens a handle on a different remote directory.
947
if relative_url is None:
948
return SmartTransport(self.base, self)
950
return SmartTransport(self.abspath(relative_url), self)
952
def is_readonly(self):
953
"""Smart server transport can do read/write file operations."""
956
def get_smart_client(self):
959
def get_smart_medium(self):
962
def _unparse_url(self, path):
963
"""Return URL for a path.
965
:see: SFTPUrlHandling._unparse_url
967
# TODO: Eventually it should be possible to unify this with
968
# SFTPUrlHandling._unparse_url?
971
path = urllib.quote(path)
972
netloc = urllib.quote(self._host)
973
if self._username is not None:
974
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
975
if self._port is not None:
976
netloc = '%s:%d' % (netloc, self._port)
977
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
979
def _remote_path(self, relpath):
980
"""Returns the Unicode version of the absolute path for relpath."""
981
return self._combine_paths(self._path, relpath)
983
def _call(self, method, *args):
984
resp = self._call2(method, *args)
985
self._translate_error(resp)
987
def _call2(self, method, *args):
988
"""Call a method on the remote server."""
989
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
990
protocol.call(method, *args)
991
return protocol.read_response_tuple()
993
def _call_with_body_bytes(self, method, args, body):
994
"""Call a method on the remote server with body bytes."""
995
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
996
protocol.call_with_body_bytes((method, ) + args, body)
997
return protocol.read_response_tuple()
999
def has(self, relpath):
1000
"""Indicate whether a remote file of the given name exists or not.
1002
:see: Transport.has()
1004
resp = self._call2('has', self._remote_path(relpath))
1005
if resp == ('yes', ):
1007
elif resp == ('no', ):
1010
self._translate_error(resp)
1012
def get(self, relpath):
1013
"""Return file-like object reading the contents of a remote file.
1015
:see: Transport.get_bytes()/get_file()
1017
return StringIO(self.get_bytes(relpath))
1019
def get_bytes(self, relpath):
1020
remote = self._remote_path(relpath)
1021
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1022
protocol.call('get', remote)
1023
resp = protocol.read_response_tuple(True)
1024
if resp != ('ok', ):
1025
protocol.cancel_read_body()
1026
self._translate_error(resp, relpath)
1027
return protocol.read_body_bytes()
1029
def _serialise_optional_mode(self, mode):
1035
def mkdir(self, relpath, mode=None):
1036
resp = self._call2('mkdir', self._remote_path(relpath),
1037
self._serialise_optional_mode(mode))
1038
self._translate_error(resp)
1040
def put_bytes(self, relpath, upload_contents, mode=None):
1041
# FIXME: upload_file is probably not safe for non-ascii characters -
1042
# should probably just pass all parameters as length-delimited
1044
resp = self._call_with_body_bytes('put',
1045
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1047
self._translate_error(resp)
1049
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1050
create_parent_dir=False,
1052
"""See Transport.put_bytes_non_atomic."""
1053
# FIXME: no encoding in the transport!
1054
create_parent_str = 'F'
1055
if create_parent_dir:
1056
create_parent_str = 'T'
1058
resp = self._call_with_body_bytes(
1060
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1061
create_parent_str, self._serialise_optional_mode(dir_mode)),
1063
self._translate_error(resp)
1065
def put_file(self, relpath, upload_file, mode=None):
1066
# its not ideal to seek back, but currently put_non_atomic_file depends
1067
# on transports not reading before failing - which is a faulty
1068
# assumption I think - RBC 20060915
1069
pos = upload_file.tell()
1071
return self.put_bytes(relpath, upload_file.read(), mode)
1073
upload_file.seek(pos)
1076
def put_file_non_atomic(self, relpath, f, mode=None,
1077
create_parent_dir=False,
1079
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1080
create_parent_dir=create_parent_dir,
1083
def append_file(self, relpath, from_file, mode=None):
1084
return self.append_bytes(relpath, from_file.read(), mode)
1086
def append_bytes(self, relpath, bytes, mode=None):
1087
resp = self._call_with_body_bytes(
1089
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1091
if resp[0] == 'appended':
1093
self._translate_error(resp)
1095
def delete(self, relpath):
1096
resp = self._call2('delete', self._remote_path(relpath))
1097
self._translate_error(resp)
1099
def readv(self, relpath, offsets):
1103
offsets = list(offsets)
1105
sorted_offsets = sorted(offsets)
1106
# turn the list of offsets into a stack
1107
offset_stack = iter(offsets)
1108
cur_offset_and_size = offset_stack.next()
1109
coalesced = list(self._coalesce_offsets(sorted_offsets,
1110
limit=self._max_readv_combine,
1111
fudge_factor=self._bytes_to_read_before_seek))
1113
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1114
protocol.call_with_body_readv_array(
1115
('readv', self._remote_path(relpath)),
1116
[(c.start, c.length) for c in coalesced])
1117
resp = protocol.read_response_tuple(True)
1119
if resp[0] != 'readv':
1120
# This should raise an exception
1121
protocol.cancel_read_body()
1122
self._translate_error(resp)
1125
# FIXME: this should know how many bytes are needed, for clarity.
1126
data = protocol.read_body_bytes()
1127
# Cache the results, but only until they have been fulfilled
1129
for c_offset in coalesced:
1130
if len(data) < c_offset.length:
1131
raise errors.ShortReadvError(relpath, c_offset.start,
1132
c_offset.length, actual=len(data))
1133
for suboffset, subsize in c_offset.ranges:
1134
key = (c_offset.start+suboffset, subsize)
1135
data_map[key] = data[suboffset:suboffset+subsize]
1136
data = data[c_offset.length:]
1138
# Now that we've read some data, see if we can yield anything back
1139
while cur_offset_and_size in data_map:
1140
this_data = data_map.pop(cur_offset_and_size)
1141
yield cur_offset_and_size[0], this_data
1142
cur_offset_and_size = offset_stack.next()
1144
def rename(self, rel_from, rel_to):
1145
self._call('rename',
1146
self._remote_path(rel_from),
1147
self._remote_path(rel_to))
1149
def move(self, rel_from, rel_to):
1151
self._remote_path(rel_from),
1152
self._remote_path(rel_to))
1154
def rmdir(self, relpath):
1155
resp = self._call('rmdir', self._remote_path(relpath))
1157
def _translate_error(self, resp, orig_path=None):
1158
"""Raise an exception from a response"""
1165
elif what == 'NoSuchFile':
1166
if orig_path is not None:
1167
error_path = orig_path
1169
error_path = resp[1]
1170
raise errors.NoSuchFile(error_path)
1171
elif what == 'error':
1172
raise errors.SmartProtocolError(unicode(resp[1]))
1173
elif what == 'FileExists':
1174
raise errors.FileExists(resp[1])
1175
elif what == 'DirectoryNotEmpty':
1176
raise errors.DirectoryNotEmpty(resp[1])
1177
elif what == 'ShortReadvError':
1178
raise errors.ShortReadvError(resp[1], int(resp[2]),
1179
int(resp[3]), int(resp[4]))
1180
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1181
encoding = str(resp[1]) # encoding must always be a string
1183
start = int(resp[3])
1185
reason = str(resp[5]) # reason must always be a string
1186
if val.startswith('u:'):
1188
elif val.startswith('s:'):
1189
val = val[2:].decode('base64')
1190
if what == 'UnicodeDecodeError':
1191
raise UnicodeDecodeError(encoding, val, start, end, reason)
1192
elif what == 'UnicodeEncodeError':
1193
raise UnicodeEncodeError(encoding, val, start, end, reason)
1194
elif what == "ReadOnlyError":
1195
raise errors.TransportNotPossible('readonly transport')
1197
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1199
def disconnect(self):
1200
self._medium.disconnect()
1202
def delete_tree(self, relpath):
1203
raise errors.TransportNotPossible('readonly transport')
1205
def stat(self, relpath):
1206
resp = self._call2('stat', self._remote_path(relpath))
1207
if resp[0] == 'stat':
1208
return SmartStat(int(resp[1]), int(resp[2], 8))
1210
self._translate_error(resp)
1212
## def lock_read(self, relpath):
1213
## """Lock the given file for shared (read) access.
1214
## :return: A lock object, which should be passed to Transport.unlock()
1216
## # The old RemoteBranch ignore lock for reading, so we will
1217
## # continue that tradition and return a bogus lock object.
1218
## class BogusLock(object):
1219
## def __init__(self, path):
1221
## def unlock(self):
1223
## return BogusLock(relpath)
1228
def list_dir(self, relpath):
1229
resp = self._call2('list_dir', self._remote_path(relpath))
1230
if resp[0] == 'names':
1231
return [name.encode('ascii') for name in resp[1:]]
1233
self._translate_error(resp)
1235
def iter_files_recursive(self):
1236
resp = self._call2('iter_files_recursive', self._remote_path(''))
1237
if resp[0] == 'names':
1240
self._translate_error(resp)
1243
class SmartClientMediumRequest(object):
1244
"""A request on a SmartClientMedium.
1246
Each request allows bytes to be provided to it via accept_bytes, and then
1247
the response bytes to be read via read_bytes.
1250
request.accept_bytes('123')
1251
request.finished_writing()
1252
result = request.read_bytes(3)
1253
request.finished_reading()
1255
It is up to the individual SmartClientMedium whether multiple concurrent
1256
requests can exist. See SmartClientMedium.get_request to obtain instances
1257
of SmartClientMediumRequest, and the concrete Medium you are using for
1258
details on concurrency and pipelining.
1261
def __init__(self, medium):
1262
"""Construct a SmartClientMediumRequest for the medium medium."""
1263
self._medium = medium
1264
# we track state by constants - we may want to use the same
1265
# pattern as BodyReader if it gets more complex.
1266
# valid states are: "writing", "reading", "done"
1267
self._state = "writing"
1269
def accept_bytes(self, bytes):
1270
"""Accept bytes for inclusion in this request.
1272
This method may not be be called after finished_writing() has been
1273
called. It depends upon the Medium whether or not the bytes will be
1274
immediately transmitted. Message based Mediums will tend to buffer the
1275
bytes until finished_writing() is called.
1277
:param bytes: A bytestring.
1279
if self._state != "writing":
1280
raise errors.WritingCompleted(self)
1281
self._accept_bytes(bytes)
1283
def _accept_bytes(self, bytes):
1284
"""Helper for accept_bytes.
1286
Accept_bytes checks the state of the request to determing if bytes
1287
should be accepted. After that it hands off to _accept_bytes to do the
1290
raise NotImplementedError(self._accept_bytes)
1292
def finished_reading(self):
1293
"""Inform the request that all desired data has been read.
1295
This will remove the request from the pipeline for its medium (if the
1296
medium supports pipelining) and any further calls to methods on the
1297
request will raise ReadingCompleted.
1299
if self._state == "writing":
1300
raise errors.WritingNotComplete(self)
1301
if self._state != "reading":
1302
raise errors.ReadingCompleted(self)
1303
self._state = "done"
1304
self._finished_reading()
1306
def _finished_reading(self):
1307
"""Helper for finished_reading.
1309
finished_reading checks the state of the request to determine if
1310
finished_reading is allowed, and if it is hands off to _finished_reading
1311
to perform the action.
1313
raise NotImplementedError(self._finished_reading)
1315
def finished_writing(self):
1316
"""Finish the writing phase of this request.
1318
This will flush all pending data for this request along the medium.
1319
After calling finished_writing, you may not call accept_bytes anymore.
1321
if self._state != "writing":
1322
raise errors.WritingCompleted(self)
1323
self._state = "reading"
1324
self._finished_writing()
1326
def _finished_writing(self):
1327
"""Helper for finished_writing.
1329
finished_writing checks the state of the request to determine if
1330
finished_writing is allowed, and if it is hands off to _finished_writing
1331
to perform the action.
1333
raise NotImplementedError(self._finished_writing)
1335
def read_bytes(self, count):
1336
"""Read bytes from this requests response.
1338
This method will block and wait for count bytes to be read. It may not
1339
be invoked until finished_writing() has been called - this is to ensure
1340
a message-based approach to requests, for compatability with message
1341
based mediums like HTTP.
1343
if self._state == "writing":
1344
raise errors.WritingNotComplete(self)
1345
if self._state != "reading":
1346
raise errors.ReadingCompleted(self)
1347
return self._read_bytes(count)
1349
def _read_bytes(self, count):
1350
"""Helper for read_bytes.
1352
read_bytes checks the state of the request to determing if bytes
1353
should be read. After that it hands off to _read_bytes to do the
1356
raise NotImplementedError(self._read_bytes)
1359
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1360
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1362
def __init__(self, medium):
1363
SmartClientMediumRequest.__init__(self, medium)
1364
# check that we are safe concurrency wise. If some streams start
1365
# allowing concurrent requests - i.e. via multiplexing - then this
1366
# assert should be moved to SmartClientStreamMedium.get_request,
1367
# and the setting/unsetting of _current_request likewise moved into
1368
# that class : but its unneeded overhead for now. RBC 20060922
1369
if self._medium._current_request is not None:
1370
raise errors.TooManyConcurrentRequests(self._medium)
1371
self._medium._current_request = self
1373
def _accept_bytes(self, bytes):
1374
"""See SmartClientMediumRequest._accept_bytes.
1376
This forwards to self._medium._accept_bytes because we are operating
1377
on the mediums stream.
1379
self._medium._accept_bytes(bytes)
1381
def _finished_reading(self):
1382
"""See SmartClientMediumRequest._finished_reading.
1384
This clears the _current_request on self._medium to allow a new
1385
request to be created.
1387
assert self._medium._current_request is self
1388
self._medium._current_request = None
1390
def _finished_writing(self):
1391
"""See SmartClientMediumRequest._finished_writing.
1393
This invokes self._medium._flush to ensure all bytes are transmitted.
1395
self._medium._flush()
1397
def _read_bytes(self, count):
1398
"""See SmartClientMediumRequest._read_bytes.
1400
This forwards to self._medium._read_bytes because we are operating
1401
on the mediums stream.
1403
return self._medium._read_bytes(count)
1406
class SmartClientRequestProtocolOne(SmartProtocolBase):
1407
"""The client-side protocol for smart version 1."""
1409
def __init__(self, request):
1410
"""Construct a SmartClientRequestProtocolOne.
1412
:param request: A SmartClientMediumRequest to serialise onto and
1415
self._request = request
1416
self._body_buffer = None
1418
def call(self, *args):
1419
bytes = _encode_tuple(args)
1420
self._request.accept_bytes(bytes)
1421
self._request.finished_writing()
1423
def call_with_body_bytes(self, args, body):
1424
"""Make a remote call of args with body bytes 'body'.
1426
After calling this, call read_response_tuple to find the result out.
1428
bytes = _encode_tuple(args)
1429
self._request.accept_bytes(bytes)
1430
bytes = self._encode_bulk_data(body)
1431
self._request.accept_bytes(bytes)
1432
self._request.finished_writing()
1434
def call_with_body_readv_array(self, args, body):
1435
"""Make a remote call with a readv array.
1437
The body is encoded with one line per readv offset pair. The numbers in
1438
each pair are separated by a comma, and no trailing \n is emitted.
1440
bytes = _encode_tuple(args)
1441
self._request.accept_bytes(bytes)
1442
readv_bytes = self._serialise_offsets(body)
1443
bytes = self._encode_bulk_data(readv_bytes)
1444
self._request.accept_bytes(bytes)
1445
self._request.finished_writing()
1447
def cancel_read_body(self):
1448
"""After expecting a body, a response code may indicate one otherwise.
1450
This method lets the domain client inform the protocol that no body
1451
will be transmitted. This is a terminal method: after calling it the
1452
protocol is not able to be used further.
1454
self._request.finished_reading()
1456
def read_response_tuple(self, expect_body=False):
1457
"""Read a response tuple from the wire.
1459
This should only be called once.
1461
result = self._recv_tuple()
1463
self._request.finished_reading()
1466
def read_body_bytes(self, count=-1):
1467
"""Read bytes from the body, decoding into a byte stream.
1469
We read all bytes at once to ensure we've checked the trailer for
1470
errors, and then feed the buffer back as read_body_bytes is called.
1472
if self._body_buffer is not None:
1473
return self._body_buffer.read(count)
1474
_body_decoder = LengthPrefixedBodyDecoder()
1475
# grab a byte from the wire: we do this so that we dont use too much
1476
# from the wire; we should have the LengthPrefixedBodyDecoder tell
1477
# us how much is needed once the header is written.
1478
# i.e. self._body_decoder.next_read_size() would be a hint.
1479
while not _body_decoder.finished_reading:
1480
byte = self._request.read_bytes(1)
1481
_body_decoder.accept_bytes(byte)
1482
self._request.finished_reading()
1483
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1484
# XXX: TODO check the trailer result.
1485
return self._body_buffer.read(count)
1487
def _recv_tuple(self):
1488
"""Recieve a tuple from the medium request."""
1490
while not line or line[-1] != '\n':
1491
# yes, this is inefficient - but tuples are short.
1492
new_char = self._request.read_bytes(1)
1494
assert new_char != '', "end of file reading from server."
1495
return _decode_tuple(line)
1497
def query_version(self):
1498
"""Return protocol version number of the server."""
1500
resp = self.read_response_tuple()
1501
if resp == ('ok', '1'):
1504
raise errors.SmartProtocolError("bad response %r" % (resp,))
1507
class SmartClientMedium(object):
1508
"""Smart client is a medium for sending smart protocol requests over."""
1510
def disconnect(self):
1511
"""If this medium maintains a persistent connection, close it.
1513
The default implementation does nothing.
1517
class SmartClientStreamMedium(SmartClientMedium):
1518
"""Stream based medium common class.
1520
SmartClientStreamMediums operate on a stream. All subclasses use a common
1521
SmartClientStreamMediumRequest for their requests, and should implement
1522
_accept_bytes and _read_bytes to allow the request objects to send and
1527
self._current_request = None
1529
def accept_bytes(self, bytes):
1530
self._accept_bytes(bytes)
1533
"""The SmartClientStreamMedium knows how to close the stream when it is
1539
"""Flush the output stream.
1541
This method is used by the SmartClientStreamMediumRequest to ensure that
1542
all data for a request is sent, to avoid long timeouts or deadlocks.
1544
raise NotImplementedError(self._flush)
1546
def get_request(self):
1547
"""See SmartClientMedium.get_request().
1549
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1552
return SmartClientStreamMediumRequest(self)
1554
def read_bytes(self, count):
1555
return self._read_bytes(count)
1558
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1559
"""A client medium using simple pipes.
1561
This client does not manage the pipes: it assumes they will always be open.
1564
def __init__(self, readable_pipe, writeable_pipe):
1565
SmartClientStreamMedium.__init__(self)
1566
self._readable_pipe = readable_pipe
1567
self._writeable_pipe = writeable_pipe
1569
def _accept_bytes(self, bytes):
1570
"""See SmartClientStreamMedium.accept_bytes."""
1571
self._writeable_pipe.write(bytes)
1574
"""See SmartClientStreamMedium._flush()."""
1575
self._writeable_pipe.flush()
1577
def _read_bytes(self, count):
1578
"""See SmartClientStreamMedium._read_bytes."""
1579
return self._readable_pipe.read(count)
1582
class SmartSSHClientMedium(SmartClientStreamMedium):
1583
"""A client medium using SSH."""
1585
def __init__(self, host, port=None, username=None, password=None,
1587
"""Creates a client that will connect on the first use.
1589
:param vendor: An optional override for the ssh vendor to use. See
1590
bzrlib.transport.ssh for details on ssh vendors.
1592
SmartClientStreamMedium.__init__(self)
1593
self._connected = False
1595
self._password = password
1597
self._username = username
1598
self._read_from = None
1599
self._ssh_connection = None
1600
self._vendor = vendor
1601
self._write_to = None
1603
def _accept_bytes(self, bytes):
1604
"""See SmartClientStreamMedium.accept_bytes."""
1605
self._ensure_connection()
1606
self._write_to.write(bytes)
1608
def disconnect(self):
1609
"""See SmartClientMedium.disconnect()."""
1610
if not self._connected:
1612
self._read_from.close()
1613
self._write_to.close()
1614
self._ssh_connection.close()
1615
self._connected = False
1617
def _ensure_connection(self):
1618
"""Connect this medium if not already connected."""
1621
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1622
if self._vendor is None:
1623
vendor = ssh._get_ssh_vendor()
1625
vendor = self._vendor
1626
self._ssh_connection = vendor.connect_ssh(self._username,
1627
self._password, self._host, self._port,
1628
command=[executable, 'serve', '--inet', '--directory=/',
1630
self._read_from, self._write_to = \
1631
self._ssh_connection.get_filelike_channels()
1632
self._connected = True
1635
"""See SmartClientStreamMedium._flush()."""
1636
self._write_to.flush()
1638
def _read_bytes(self, count):
1639
"""See SmartClientStreamMedium.read_bytes."""
1640
if not self._connected:
1641
raise errors.MediumNotConnected(self)
1642
return self._read_from.read(count)
1645
class SmartTCPClientMedium(SmartClientStreamMedium):
1646
"""A client medium using TCP."""
1648
def __init__(self, host, port):
1649
"""Creates a client that will connect on the first use."""
1650
SmartClientStreamMedium.__init__(self)
1651
self._connected = False
1656
def _accept_bytes(self, bytes):
1657
"""See SmartClientMedium.accept_bytes."""
1658
self._ensure_connection()
1659
self._socket.sendall(bytes)
1661
def disconnect(self):
1662
"""See SmartClientMedium.disconnect()."""
1663
if not self._connected:
1665
self._socket.close()
1667
self._connected = False
1669
def _ensure_connection(self):
1670
"""Connect this medium if not already connected."""
1673
self._socket = socket.socket()
1674
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1675
result = self._socket.connect_ex((self._host, int(self._port)))
1677
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1678
(self._host, self._port, os.strerror(result)))
1679
self._connected = True
1682
"""See SmartClientStreamMedium._flush().
1684
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1685
add a means to do a flush, but that can be done in the future.
1688
def _read_bytes(self, count):
1689
"""See SmartClientMedium.read_bytes."""
1690
if not self._connected:
1691
raise errors.MediumNotConnected(self)
1692
return self._socket.recv(count)
1695
class SmartTCPTransport(SmartTransport):
1696
"""Connection to smart server over plain tcp.
1698
This is essentially just a factory to get 'RemoteTransport(url,
1699
SmartTCPClientMedium).
1702
def __init__(self, url):
1703
_scheme, _username, _password, _host, _port, _path = \
1704
transport.split_url(url)
1707
except (ValueError, TypeError), e:
1708
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1709
medium = SmartTCPClientMedium(_host, _port)
1710
super(SmartTCPTransport, self).__init__(url, medium=medium)
1714
from bzrlib.transport import ssh
1715
except errors.ParamikoNotPresent:
1716
# no paramiko, no SSHTransport.
1719
class SmartSSHTransport(SmartTransport):
1720
"""Connection to smart server over SSH.
1722
This is essentially just a factory to get 'RemoteTransport(url,
1723
SmartSSHClientMedium).
1726
def __init__(self, url):
1727
_scheme, _username, _password, _host, _port, _path = \
1728
transport.split_url(url)
1730
if _port is not None:
1732
except (ValueError, TypeError), e:
1733
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1735
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1736
super(SmartSSHTransport, self).__init__(url, medium=medium)
1739
def get_test_permutations():
1740
"""Return (transport, server) permutations for testing."""
1741
### We may need a little more test framework support to construct an
1742
### appropriate RemoteTransport in the future.
1743
return [(SmartTCPTransport, SmartTCPServer_for_testing)]