1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the idea
51
that you have multiple requests and get a read error because the other side did
52
shutdown. For pipes we have read pipe which will have a zero read which marks
53
end-of-file. For HTTP server environment there is not end-of-stream because
54
each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out requests from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialization, deserialization) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialization, deserialization) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
197
from cStringIO import StringIO
213
from bzrlib.bundle.serializer import write_bundle
215
from bzrlib.transport import ssh
216
except errors.ParamikoNotPresent:
217
# no paramiko. SmartSSHClientMedium will break.
220
# must do this otherwise urllib can't parse the urls properly :(
221
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
222
transport.register_urlparse_netloc_protocol(scheme)
226
# Port 4155 is the default port for bzr://, registered with IANA.
227
BZR_DEFAULT_PORT = 4155
230
def _recv_tuple(from_file):
231
req_line = from_file.readline()
232
return _decode_tuple(req_line)
235
def _decode_tuple(req_line):
236
if req_line == None or req_line == '':
238
if req_line[-1] != '\n':
239
raise errors.SmartProtocolError("request %r not terminated" % req_line)
240
return tuple(req_line[:-1].split('\x01'))
243
def _encode_tuple(args):
244
"""Encode the tuple args to a bytestream."""
245
return '\x01'.join(args) + '\n'
248
class SmartProtocolBase(object):
249
"""Methods common to client and server"""
251
# TODO: this only actually accomodates a single block; possibly should
252
# support multiple chunks?
253
def _encode_bulk_data(self, body):
254
"""Encode body as a bulk data chunk."""
255
return ''.join(('%d\n' % len(body), body, 'done\n'))
257
def _serialise_offsets(self, offsets):
258
"""Serialise a readv offset list."""
260
for start, length in offsets:
261
txt.append('%d,%d' % (start, length))
262
return '\n'.join(txt)
265
class SmartServerRequestProtocolOne(SmartProtocolBase):
266
"""Server-side encoding and decoding logic for smart version 1."""
268
def __init__(self, backing_transport, write_func):
269
self._backing_transport = backing_transport
270
self.excess_buffer = ''
271
self._finished = False
273
self.has_dispatched = False
275
self._body_decoder = None
276
self._write_func = write_func
278
def accept_bytes(self, bytes):
279
"""Take bytes, and advance the internal state machine appropriately.
281
:param bytes: must be a byte string
283
assert isinstance(bytes, str)
284
self.in_buffer += bytes
285
if not self.has_dispatched:
286
if '\n' not in self.in_buffer:
287
# no command line yet
289
self.has_dispatched = True
291
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
293
req_args = _decode_tuple(first_line)
294
self.request = SmartServerRequestHandler(
295
self._backing_transport)
296
self.request.dispatch_command(req_args[0], req_args[1:])
297
if self.request.finished_reading:
299
self.excess_buffer = self.in_buffer
301
self._send_response(self.request.response.args,
302
self.request.response.body)
303
except KeyboardInterrupt:
305
except Exception, exception:
306
# everything else: pass to client, flush, and quit
307
self._send_response(('error', str(exception)))
310
if self.has_dispatched:
312
# nothing to do.XXX: this routine should be a single state
314
self.excess_buffer += self.in_buffer
317
if self._body_decoder is None:
318
self._body_decoder = LengthPrefixedBodyDecoder()
319
self._body_decoder.accept_bytes(self.in_buffer)
320
self.in_buffer = self._body_decoder.unused_data
321
body_data = self._body_decoder.read_pending_data()
322
self.request.accept_body(body_data)
323
if self._body_decoder.finished_reading:
324
self.request.end_of_body()
325
assert self.request.finished_reading, \
326
"no more body, request not finished"
327
if self.request.response is not None:
328
self._send_response(self.request.response.args,
329
self.request.response.body)
330
self.excess_buffer = self.in_buffer
333
assert not self.request.finished_reading, \
334
"no response and we have finished reading."
336
def _send_response(self, args, body=None):
337
"""Send a smart server response down the output stream."""
338
assert not self._finished, 'response already sent'
339
self._finished = True
340
self._write_func(_encode_tuple(args))
342
assert isinstance(body, str), 'body must be a str'
343
bytes = self._encode_bulk_data(body)
344
self._write_func(bytes)
346
def next_read_size(self):
349
if self._body_decoder is None:
352
return self._body_decoder.next_read_size()
355
class LengthPrefixedBodyDecoder(object):
356
"""Decodes the length-prefixed bulk data."""
359
self.bytes_left = None
360
self.finished_reading = False
361
self.unused_data = ''
362
self.state_accept = self._state_accept_expecting_length
363
self.state_read = self._state_read_no_data
365
self._trailer_buffer = ''
367
def accept_bytes(self, bytes):
368
"""Decode as much of bytes as possible.
370
If 'bytes' contains too much data it will be appended to
373
finished_reading will be set when no more data is required. Further
374
data will be appended to self.unused_data.
376
# accept_bytes is allowed to change the state
377
current_state = self.state_accept
378
self.state_accept(bytes)
379
while current_state != self.state_accept:
380
current_state = self.state_accept
381
self.state_accept('')
383
def next_read_size(self):
384
if self.bytes_left is not None:
385
# Ideally we want to read all the remainder of the body and the
387
return self.bytes_left + 5
388
elif self.state_accept == self._state_accept_reading_trailer:
389
# Just the trailer left
390
return 5 - len(self._trailer_buffer)
391
elif self.state_accept == self._state_accept_expecting_length:
392
# There's still at least 6 bytes left ('\n' to end the length, plus
396
# Reading excess data. Either way, 1 byte at a time is fine.
399
def read_pending_data(self):
400
"""Return any pending data that has been decoded."""
401
return self.state_read()
403
def _state_accept_expecting_length(self, bytes):
404
self._in_buffer += bytes
405
pos = self._in_buffer.find('\n')
408
self.bytes_left = int(self._in_buffer[:pos])
409
self._in_buffer = self._in_buffer[pos+1:]
410
self.bytes_left -= len(self._in_buffer)
411
self.state_accept = self._state_accept_reading_body
412
self.state_read = self._state_read_in_buffer
414
def _state_accept_reading_body(self, bytes):
415
self._in_buffer += bytes
416
self.bytes_left -= len(bytes)
417
if self.bytes_left <= 0:
419
if self.bytes_left != 0:
420
self._trailer_buffer = self._in_buffer[self.bytes_left:]
421
self._in_buffer = self._in_buffer[:self.bytes_left]
422
self.bytes_left = None
423
self.state_accept = self._state_accept_reading_trailer
425
def _state_accept_reading_trailer(self, bytes):
426
self._trailer_buffer += bytes
427
# TODO: what if the trailer does not match "done\n"? Should this raise
428
# a ProtocolViolation exception?
429
if self._trailer_buffer.startswith('done\n'):
430
self.unused_data = self._trailer_buffer[len('done\n'):]
431
self.state_accept = self._state_accept_reading_unused
432
self.finished_reading = True
434
def _state_accept_reading_unused(self, bytes):
435
self.unused_data += bytes
437
def _state_read_no_data(self):
440
def _state_read_in_buffer(self):
441
result = self._in_buffer
446
class SmartServerStreamMedium(object):
447
"""Handles smart commands coming over a stream.
449
The stream may be a pipe connected to sshd, or a tcp socket, or an
450
in-process fifo for testing.
452
One instance is created for each connected client; it can serve multiple
453
requests in the lifetime of the connection.
455
The server passes requests through to an underlying backing transport,
456
which will typically be a LocalTransport looking at the server's filesystem.
459
def __init__(self, backing_transport):
460
"""Construct new server.
462
:param backing_transport: Transport for the directory served.
464
# backing_transport could be passed to serve instead of __init__
465
self.backing_transport = backing_transport
466
self.finished = False
469
"""Serve requests until the client disconnects."""
470
# Keep a reference to stderr because the sys module's globals get set to
471
# None during interpreter shutdown.
472
from sys import stderr
474
while not self.finished:
475
protocol = SmartServerRequestProtocolOne(self.backing_transport,
477
self._serve_one_request(protocol)
479
stderr.write("%s terminating on exception %s\n" % (self, e))
482
def _serve_one_request(self, protocol):
483
"""Read one request from input, process, send back a response.
485
:param protocol: a SmartServerRequestProtocol.
488
self._serve_one_request_unguarded(protocol)
489
except KeyboardInterrupt:
492
self.terminate_due_to_error()
494
def terminate_due_to_error(self):
495
"""Called when an unhandled exception from the protocol occurs."""
496
raise NotImplementedError(self.terminate_due_to_error)
499
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
501
def __init__(self, sock, backing_transport):
504
:param sock: the socket the server will read from. It will be put
507
SmartServerStreamMedium.__init__(self, backing_transport)
509
sock.setblocking(True)
512
def _serve_one_request_unguarded(self, protocol):
513
while protocol.next_read_size():
515
protocol.accept_bytes(self.push_back)
518
bytes = self.socket.recv(4096)
522
protocol.accept_bytes(bytes)
524
self.push_back = protocol.excess_buffer
526
def terminate_due_to_error(self):
527
"""Called when an unhandled exception from the protocol occurs."""
528
# TODO: This should log to a server log file, but no such thing
529
# exists yet. Andrew Bennetts 2006-09-29.
533
def _write_out(self, bytes):
534
self.socket.sendall(bytes)
537
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
539
def __init__(self, in_file, out_file, backing_transport):
540
"""Construct new server.
542
:param in_file: Python file from which requests can be read.
543
:param out_file: Python file to write responses.
544
:param backing_transport: Transport for the directory served.
546
SmartServerStreamMedium.__init__(self, backing_transport)
550
def _serve_one_request_unguarded(self, protocol):
552
bytes_to_read = protocol.next_read_size()
553
if bytes_to_read == 0:
554
# Finished serving this request.
557
bytes = self._in.read(bytes_to_read)
559
# Connection has been closed.
563
protocol.accept_bytes(bytes)
565
def terminate_due_to_error(self):
566
# TODO: This should log to a server log file, but no such thing
567
# exists yet. Andrew Bennetts 2006-09-29.
571
def _write_out(self, bytes):
572
self._out.write(bytes)
575
class SmartServerResponse(object):
576
"""Response generated by SmartServerRequestHandler."""
578
def __init__(self, args, body=None):
582
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
583
# for delivering the data for a request. This could be done with as the
584
# StreamServer, though that would create conflation between request and response
585
# which may be undesirable.
588
class SmartServerRequestHandler(object):
589
"""Protocol logic for smart server.
591
This doesn't handle serialization at all, it just processes requests and
595
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
596
# not contain encoding or decoding logic to allow the wire protocol to vary
597
# from the object protocol: we will want to tweak the wire protocol separate
598
# from the object model, and ideally we will be able to do that without
599
# having a SmartServerRequestHandler subclass for each wire protocol, rather
600
# just a Protocol subclass.
602
# TODO: Better way of representing the body for commands that take it,
603
# and allow it to be streamed into the server.
605
def __init__(self, backing_transport):
606
self._backing_transport = backing_transport
607
self._converted_command = False
608
self.finished_reading = False
609
self._body_bytes = ''
612
def accept_body(self, bytes):
615
This should be overriden for each command that desired body data to
616
handle the right format of that data. I.e. plain bytes, a bundle etc.
618
The deserialisation into that format should be done in the Protocol
619
object. Set self.desired_body_format to the format your method will
622
# default fallback is to accumulate bytes.
623
self._body_bytes += bytes
625
def _end_of_body_handler(self):
626
"""An unimplemented end of body handler."""
627
raise NotImplementedError(self._end_of_body_handler)
630
"""Answer a version request with my version."""
631
return SmartServerResponse(('ok', '1'))
633
def do_has(self, relpath):
634
r = self._backing_transport.has(relpath) and 'yes' or 'no'
635
return SmartServerResponse((r,))
637
def do_get(self, relpath):
638
backing_bytes = self._backing_transport.get_bytes(relpath)
639
return SmartServerResponse(('ok',), backing_bytes)
641
def _deserialise_optional_mode(self, mode):
642
# XXX: FIXME this should be on the protocol object.
648
def do_append(self, relpath, mode):
649
self._converted_command = True
650
self._relpath = relpath
651
self._mode = self._deserialise_optional_mode(mode)
652
self._end_of_body_handler = self._handle_do_append_end
654
def _handle_do_append_end(self):
655
old_length = self._backing_transport.append_bytes(
656
self._relpath, self._body_bytes, self._mode)
657
self.response = SmartServerResponse(('appended', '%d' % old_length))
659
def do_delete(self, relpath):
660
self._backing_transport.delete(relpath)
662
def do_iter_files_recursive(self, relpath):
663
transport = self._backing_transport.clone(relpath)
664
filenames = transport.iter_files_recursive()
665
return SmartServerResponse(('names',) + tuple(filenames))
667
def do_list_dir(self, relpath):
668
filenames = self._backing_transport.list_dir(relpath)
669
return SmartServerResponse(('names',) + tuple(filenames))
671
def do_mkdir(self, relpath, mode):
672
self._backing_transport.mkdir(relpath,
673
self._deserialise_optional_mode(mode))
675
def do_move(self, rel_from, rel_to):
676
self._backing_transport.move(rel_from, rel_to)
678
def do_put(self, relpath, mode):
679
self._converted_command = True
680
self._relpath = relpath
681
self._mode = self._deserialise_optional_mode(mode)
682
self._end_of_body_handler = self._handle_do_put
684
def _handle_do_put(self):
685
self._backing_transport.put_bytes(self._relpath,
686
self._body_bytes, self._mode)
687
self.response = SmartServerResponse(('ok',))
689
def _deserialise_offsets(self, text):
690
# XXX: FIXME this should be on the protocol object.
692
for line in text.split('\n'):
695
start, length = line.split(',')
696
offsets.append((int(start), int(length)))
699
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
700
self._converted_command = True
701
self._end_of_body_handler = self._handle_put_non_atomic
702
self._relpath = relpath
703
self._dir_mode = self._deserialise_optional_mode(dir_mode)
704
self._mode = self._deserialise_optional_mode(mode)
705
# a boolean would be nicer XXX
706
self._create_parent = (create_parent == 'T')
708
def _handle_put_non_atomic(self):
709
self._backing_transport.put_bytes_non_atomic(self._relpath,
712
create_parent_dir=self._create_parent,
713
dir_mode=self._dir_mode)
714
self.response = SmartServerResponse(('ok',))
716
def do_readv(self, relpath):
717
self._converted_command = True
718
self._end_of_body_handler = self._handle_readv_offsets
719
self._relpath = relpath
721
def end_of_body(self):
722
"""No more body data will be received."""
723
self._run_handler_code(self._end_of_body_handler, (), {})
724
# cannot read after this.
725
self.finished_reading = True
727
def _handle_readv_offsets(self):
728
"""accept offsets for a readv request."""
729
offsets = self._deserialise_offsets(self._body_bytes)
730
backing_bytes = ''.join(bytes for offset, bytes in
731
self._backing_transport.readv(self._relpath, offsets))
732
self.response = SmartServerResponse(('readv',), backing_bytes)
734
def do_rename(self, rel_from, rel_to):
735
self._backing_transport.rename(rel_from, rel_to)
737
def do_rmdir(self, relpath):
738
self._backing_transport.rmdir(relpath)
740
def do_stat(self, relpath):
741
stat = self._backing_transport.stat(relpath)
742
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
744
def do_get_bundle(self, path, revision_id):
745
# open transport relative to our base
746
t = self._backing_transport.clone(path)
747
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
748
repo = control.open_repository()
749
tmpf = tempfile.TemporaryFile()
750
base_revision = revision.NULL_REVISION
751
write_bundle(repo, revision_id, base_revision, tmpf)
753
return SmartServerResponse((), tmpf.read())
755
def dispatch_command(self, cmd, args):
756
"""Deprecated compatibility method.""" # XXX XXX
757
func = getattr(self, 'do_' + cmd, None)
759
raise errors.SmartProtocolError("bad request %r" % (cmd,))
760
self._run_handler_code(func, args, {})
762
def _run_handler_code(self, callable, args, kwargs):
763
"""Run some handler specific code 'callable'.
765
If a result is returned, it is considered to be the commands response,
766
and finished_reading is set true, and its assigned to self.response.
768
Any exceptions caught are translated and a response object created
771
result = self._call_converting_errors(callable, args, kwargs)
772
if result is not None:
773
self.response = result
774
self.finished_reading = True
775
# handle unconverted commands
776
if not self._converted_command:
777
self.finished_reading = True
779
self.response = SmartServerResponse(('ok',))
781
def _call_converting_errors(self, callable, args, kwargs):
782
"""Call callable converting errors to Response objects."""
784
return callable(*args, **kwargs)
785
except errors.NoSuchFile, e:
786
return SmartServerResponse(('NoSuchFile', e.path))
787
except errors.FileExists, e:
788
return SmartServerResponse(('FileExists', e.path))
789
except errors.DirectoryNotEmpty, e:
790
return SmartServerResponse(('DirectoryNotEmpty', e.path))
791
except errors.ShortReadvError, e:
792
return SmartServerResponse(('ShortReadvError',
793
e.path, str(e.offset), str(e.length), str(e.actual)))
794
except UnicodeError, e:
795
# If it is a DecodeError, than most likely we are starting
796
# with a plain string
797
str_or_unicode = e.object
798
if isinstance(str_or_unicode, unicode):
799
# XXX: UTF-8 might have \x01 (our seperator byte) in it. We
800
# should escape it somehow.
801
val = 'u:' + str_or_unicode.encode('utf-8')
803
val = 's:' + str_or_unicode.encode('base64')
804
# This handles UnicodeEncodeError or UnicodeDecodeError
805
return SmartServerResponse((e.__class__.__name__,
806
e.encoding, val, str(e.start), str(e.end), e.reason))
807
except errors.TransportNotPossible, e:
808
if e.msg == "readonly transport":
809
return SmartServerResponse(('ReadOnlyError', ))
814
class SmartTCPServer(object):
815
"""Listens on a TCP socket and accepts connections from smart clients"""
817
def __init__(self, backing_transport, host='127.0.0.1', port=0):
818
"""Construct a new server.
820
To actually start it running, call either start_background_thread or
823
:param host: Name of the interface to listen on.
824
:param port: TCP port to listen on, or 0 to allocate a transient port.
826
self._server_socket = socket.socket()
827
self._server_socket.bind((host, port))
828
self.port = self._server_socket.getsockname()[1]
829
self._server_socket.listen(1)
830
self._server_socket.settimeout(1)
831
self.backing_transport = backing_transport
834
# let connections timeout so that we get a chance to terminate
835
# Keep a reference to the exceptions we want to catch because the socket
836
# module's globals get set to None during interpreter shutdown.
837
from socket import timeout as socket_timeout
838
from socket import error as socket_error
839
self._should_terminate = False
840
while not self._should_terminate:
842
self.accept_and_serve()
843
except socket_timeout:
844
# just check if we're asked to stop
846
except socket_error, e:
847
trace.warning("client disconnected: %s", e)
851
"""Return the url of the server"""
852
return "bzr://%s:%d/" % self._server_socket.getsockname()
854
def accept_and_serve(self):
855
conn, client_addr = self._server_socket.accept()
856
# For WIN32, where the timeout value from the listening socket
857
# propogates to the newly accepted socket.
858
conn.setblocking(True)
859
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
860
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
861
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
862
connection_thread.setDaemon(True)
863
connection_thread.start()
865
def start_background_thread(self):
866
self._server_thread = threading.Thread(None,
868
name='server-' + self.get_url())
869
self._server_thread.setDaemon(True)
870
self._server_thread.start()
872
def stop_background_thread(self):
873
self._should_terminate = True
874
# At one point we would wait to join the threads here, but it looks
875
# like they don't actually exit. So now we just leave them running
876
# and expect to terminate the process. -- mbp 20070215
877
# self._server_socket.close()
878
## sys.stderr.write("waiting for server thread to finish...")
879
## self._server_thread.join()
882
class SmartTCPServer_for_testing(SmartTCPServer):
883
"""Server suitable for use by transport tests.
885
This server is backed by the process's cwd.
889
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
890
# The server is set up by default like for ssh access: the client
891
# passes filesystem-absolute paths; therefore the server must look
892
# them up relative to the root directory. it might be better to act
893
# a public server and have the server rewrite paths into the test
895
SmartTCPServer.__init__(self,
896
transport.get_transport(urlutils.local_path_to_url('/')))
899
"""Set up server for testing"""
900
self.start_background_thread()
903
self.stop_background_thread()
906
"""Return the url of the server"""
907
host, port = self._server_socket.getsockname()
908
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
910
def get_bogus_url(self):
911
"""Return a URL which will fail to connect"""
912
return 'bzr://127.0.0.1:1/'
915
class SmartStat(object):
917
def __init__(self, size, mode):
922
class SmartTransport(transport.Transport):
923
"""Connection to a smart server.
925
The connection holds references to pipes that can be used to send requests
928
The connection has a notion of the current directory to which it's
929
connected; this is incorporated in filenames passed to the server.
931
This supports some higher-level RPC operations and can also be treated
932
like a Transport to do file-like operations.
934
The connection can be made over a tcp socket, or (in future) an ssh pipe
935
or a series of http requests. There are concrete subclasses for each
936
type: SmartTCPTransport, etc.
939
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
940
# responsibilities: Put those on SmartClient or similar. This is vital for
941
# the ability to support multiple versions of the smart protocol over time:
942
# SmartTransport is an adapter from the Transport object model to the
943
# SmartClient model, not an encoder.
945
def __init__(self, url, clone_from=None, medium=None):
948
:param medium: The medium to use for this RemoteTransport. This must be
949
supplied if clone_from is None.
951
### Technically super() here is faulty because Transport's __init__
952
### fails to take 2 parameters, and if super were to choose a silly
953
### initialisation order things would blow up.
954
if not url.endswith('/'):
956
super(SmartTransport, self).__init__(url)
957
self._scheme, self._username, self._password, self._host, self._port, self._path = \
958
transport.split_url(url)
959
if clone_from is None:
960
self._medium = medium
962
# credentials may be stripped from the base in some circumstances
963
# as yet to be clearly defined or documented, so copy them.
964
self._username = clone_from._username
965
# reuse same connection
966
self._medium = clone_from._medium
967
assert self._medium is not None
969
def abspath(self, relpath):
970
"""Return the full url to the given relative path.
972
@param relpath: the relative path or path components
973
@type relpath: str or list
975
return self._unparse_url(self._remote_path(relpath))
977
def clone(self, relative_url):
978
"""Make a new SmartTransport related to me, sharing the same connection.
980
This essentially opens a handle on a different remote directory.
982
if relative_url is None:
983
return SmartTransport(self.base, self)
985
return SmartTransport(self.abspath(relative_url), self)
987
def is_readonly(self):
988
"""Smart server transport can do read/write file operations."""
991
def get_smart_client(self):
994
def get_smart_medium(self):
997
def _unparse_url(self, path):
998
"""Return URL for a path.
1000
:see: SFTPUrlHandling._unparse_url
1002
# TODO: Eventually it should be possible to unify this with
1003
# SFTPUrlHandling._unparse_url?
1006
path = urllib.quote(path)
1007
netloc = urllib.quote(self._host)
1008
if self._username is not None:
1009
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
1010
if self._port is not None:
1011
netloc = '%s:%d' % (netloc, self._port)
1012
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
1014
def _remote_path(self, relpath):
1015
"""Returns the Unicode version of the absolute path for relpath."""
1016
return self._combine_paths(self._path, relpath)
1018
def _call(self, method, *args):
1019
resp = self._call2(method, *args)
1020
self._translate_error(resp)
1022
def _call2(self, method, *args):
1023
"""Call a method on the remote server."""
1024
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1025
protocol.call(method, *args)
1026
return protocol.read_response_tuple()
1028
def _call_with_body_bytes(self, method, args, body):
1029
"""Call a method on the remote server with body bytes."""
1030
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1031
protocol.call_with_body_bytes((method, ) + args, body)
1032
return protocol.read_response_tuple()
1034
def has(self, relpath):
1035
"""Indicate whether a remote file of the given name exists or not.
1037
:see: Transport.has()
1039
resp = self._call2('has', self._remote_path(relpath))
1040
if resp == ('yes', ):
1042
elif resp == ('no', ):
1045
self._translate_error(resp)
1047
def get(self, relpath):
1048
"""Return file-like object reading the contents of a remote file.
1050
:see: Transport.get_bytes()/get_file()
1052
return StringIO(self.get_bytes(relpath))
1054
def get_bytes(self, relpath):
1055
remote = self._remote_path(relpath)
1056
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1057
protocol.call('get', remote)
1058
resp = protocol.read_response_tuple(True)
1059
if resp != ('ok', ):
1060
protocol.cancel_read_body()
1061
self._translate_error(resp, relpath)
1062
return protocol.read_body_bytes()
1064
def _serialise_optional_mode(self, mode):
1070
def mkdir(self, relpath, mode=None):
1071
resp = self._call2('mkdir', self._remote_path(relpath),
1072
self._serialise_optional_mode(mode))
1073
self._translate_error(resp)
1075
def put_bytes(self, relpath, upload_contents, mode=None):
1076
# FIXME: upload_file is probably not safe for non-ascii characters -
1077
# should probably just pass all parameters as length-delimited
1079
resp = self._call_with_body_bytes('put',
1080
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1082
self._translate_error(resp)
1084
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1085
create_parent_dir=False,
1087
"""See Transport.put_bytes_non_atomic."""
1088
# FIXME: no encoding in the transport!
1089
create_parent_str = 'F'
1090
if create_parent_dir:
1091
create_parent_str = 'T'
1093
resp = self._call_with_body_bytes(
1095
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1096
create_parent_str, self._serialise_optional_mode(dir_mode)),
1098
self._translate_error(resp)
1100
def put_file(self, relpath, upload_file, mode=None):
1101
# its not ideal to seek back, but currently put_non_atomic_file depends
1102
# on transports not reading before failing - which is a faulty
1103
# assumption I think - RBC 20060915
1104
pos = upload_file.tell()
1106
return self.put_bytes(relpath, upload_file.read(), mode)
1108
upload_file.seek(pos)
1111
def put_file_non_atomic(self, relpath, f, mode=None,
1112
create_parent_dir=False,
1114
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1115
create_parent_dir=create_parent_dir,
1118
def append_file(self, relpath, from_file, mode=None):
1119
return self.append_bytes(relpath, from_file.read(), mode)
1121
def append_bytes(self, relpath, bytes, mode=None):
1122
resp = self._call_with_body_bytes(
1124
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1126
if resp[0] == 'appended':
1128
self._translate_error(resp)
1130
def delete(self, relpath):
1131
resp = self._call2('delete', self._remote_path(relpath))
1132
self._translate_error(resp)
1134
def readv(self, relpath, offsets):
1138
offsets = list(offsets)
1140
sorted_offsets = sorted(offsets)
1141
# turn the list of offsets into a stack
1142
offset_stack = iter(offsets)
1143
cur_offset_and_size = offset_stack.next()
1144
coalesced = list(self._coalesce_offsets(sorted_offsets,
1145
limit=self._max_readv_combine,
1146
fudge_factor=self._bytes_to_read_before_seek))
1148
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1149
protocol.call_with_body_readv_array(
1150
('readv', self._remote_path(relpath)),
1151
[(c.start, c.length) for c in coalesced])
1152
resp = protocol.read_response_tuple(True)
1154
if resp[0] != 'readv':
1155
# This should raise an exception
1156
protocol.cancel_read_body()
1157
self._translate_error(resp)
1160
# FIXME: this should know how many bytes are needed, for clarity.
1161
data = protocol.read_body_bytes()
1162
# Cache the results, but only until they have been fulfilled
1164
for c_offset in coalesced:
1165
if len(data) < c_offset.length:
1166
raise errors.ShortReadvError(relpath, c_offset.start,
1167
c_offset.length, actual=len(data))
1168
for suboffset, subsize in c_offset.ranges:
1169
key = (c_offset.start+suboffset, subsize)
1170
data_map[key] = data[suboffset:suboffset+subsize]
1171
data = data[c_offset.length:]
1173
# Now that we've read some data, see if we can yield anything back
1174
while cur_offset_and_size in data_map:
1175
this_data = data_map.pop(cur_offset_and_size)
1176
yield cur_offset_and_size[0], this_data
1177
cur_offset_and_size = offset_stack.next()
1179
def rename(self, rel_from, rel_to):
1180
self._call('rename',
1181
self._remote_path(rel_from),
1182
self._remote_path(rel_to))
1184
def move(self, rel_from, rel_to):
1186
self._remote_path(rel_from),
1187
self._remote_path(rel_to))
1189
def rmdir(self, relpath):
1190
resp = self._call('rmdir', self._remote_path(relpath))
1192
def _translate_error(self, resp, orig_path=None):
1193
"""Raise an exception from a response"""
1200
elif what == 'NoSuchFile':
1201
if orig_path is not None:
1202
error_path = orig_path
1204
error_path = resp[1]
1205
raise errors.NoSuchFile(error_path)
1206
elif what == 'error':
1207
raise errors.SmartProtocolError(unicode(resp[1]))
1208
elif what == 'FileExists':
1209
raise errors.FileExists(resp[1])
1210
elif what == 'DirectoryNotEmpty':
1211
raise errors.DirectoryNotEmpty(resp[1])
1212
elif what == 'ShortReadvError':
1213
raise errors.ShortReadvError(resp[1], int(resp[2]),
1214
int(resp[3]), int(resp[4]))
1215
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1216
encoding = str(resp[1]) # encoding must always be a string
1218
start = int(resp[3])
1220
reason = str(resp[5]) # reason must always be a string
1221
if val.startswith('u:'):
1222
val = val[2:].decode('utf-8')
1223
elif val.startswith('s:'):
1224
val = val[2:].decode('base64')
1225
if what == 'UnicodeDecodeError':
1226
raise UnicodeDecodeError(encoding, val, start, end, reason)
1227
elif what == 'UnicodeEncodeError':
1228
raise UnicodeEncodeError(encoding, val, start, end, reason)
1229
elif what == "ReadOnlyError":
1230
raise errors.TransportNotPossible('readonly transport')
1232
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1234
def disconnect(self):
1235
self._medium.disconnect()
1237
def delete_tree(self, relpath):
1238
raise errors.TransportNotPossible('readonly transport')
1240
def stat(self, relpath):
1241
resp = self._call2('stat', self._remote_path(relpath))
1242
if resp[0] == 'stat':
1243
return SmartStat(int(resp[1]), int(resp[2], 8))
1245
self._translate_error(resp)
1247
## def lock_read(self, relpath):
1248
## """Lock the given file for shared (read) access.
1249
## :return: A lock object, which should be passed to Transport.unlock()
1251
## # The old RemoteBranch ignore lock for reading, so we will
1252
## # continue that tradition and return a bogus lock object.
1253
## class BogusLock(object):
1254
## def __init__(self, path):
1256
## def unlock(self):
1258
## return BogusLock(relpath)
1263
def list_dir(self, relpath):
1264
resp = self._call2('list_dir', self._remote_path(relpath))
1265
if resp[0] == 'names':
1266
return [name.encode('ascii') for name in resp[1:]]
1268
self._translate_error(resp)
1270
def iter_files_recursive(self):
1271
resp = self._call2('iter_files_recursive', self._remote_path(''))
1272
if resp[0] == 'names':
1275
self._translate_error(resp)
1278
class SmartClientMediumRequest(object):
1279
"""A request on a SmartClientMedium.
1281
Each request allows bytes to be provided to it via accept_bytes, and then
1282
the response bytes to be read via read_bytes.
1285
request.accept_bytes('123')
1286
request.finished_writing()
1287
result = request.read_bytes(3)
1288
request.finished_reading()
1290
It is up to the individual SmartClientMedium whether multiple concurrent
1291
requests can exist. See SmartClientMedium.get_request to obtain instances
1292
of SmartClientMediumRequest, and the concrete Medium you are using for
1293
details on concurrency and pipelining.
1296
def __init__(self, medium):
1297
"""Construct a SmartClientMediumRequest for the medium medium."""
1298
self._medium = medium
1299
# we track state by constants - we may want to use the same
1300
# pattern as BodyReader if it gets more complex.
1301
# valid states are: "writing", "reading", "done"
1302
self._state = "writing"
1304
def accept_bytes(self, bytes):
1305
"""Accept bytes for inclusion in this request.
1307
This method may not be be called after finished_writing() has been
1308
called. It depends upon the Medium whether or not the bytes will be
1309
immediately transmitted. Message based Mediums will tend to buffer the
1310
bytes until finished_writing() is called.
1312
:param bytes: A bytestring.
1314
if self._state != "writing":
1315
raise errors.WritingCompleted(self)
1316
self._accept_bytes(bytes)
1318
def _accept_bytes(self, bytes):
1319
"""Helper for accept_bytes.
1321
Accept_bytes checks the state of the request to determing if bytes
1322
should be accepted. After that it hands off to _accept_bytes to do the
1325
raise NotImplementedError(self._accept_bytes)
1327
def finished_reading(self):
1328
"""Inform the request that all desired data has been read.
1330
This will remove the request from the pipeline for its medium (if the
1331
medium supports pipelining) and any further calls to methods on the
1332
request will raise ReadingCompleted.
1334
if self._state == "writing":
1335
raise errors.WritingNotComplete(self)
1336
if self._state != "reading":
1337
raise errors.ReadingCompleted(self)
1338
self._state = "done"
1339
self._finished_reading()
1341
def _finished_reading(self):
1342
"""Helper for finished_reading.
1344
finished_reading checks the state of the request to determine if
1345
finished_reading is allowed, and if it is hands off to _finished_reading
1346
to perform the action.
1348
raise NotImplementedError(self._finished_reading)
1350
def finished_writing(self):
1351
"""Finish the writing phase of this request.
1353
This will flush all pending data for this request along the medium.
1354
After calling finished_writing, you may not call accept_bytes anymore.
1356
if self._state != "writing":
1357
raise errors.WritingCompleted(self)
1358
self._state = "reading"
1359
self._finished_writing()
1361
def _finished_writing(self):
1362
"""Helper for finished_writing.
1364
finished_writing checks the state of the request to determine if
1365
finished_writing is allowed, and if it is hands off to _finished_writing
1366
to perform the action.
1368
raise NotImplementedError(self._finished_writing)
1370
def read_bytes(self, count):
1371
"""Read bytes from this requests response.
1373
This method will block and wait for count bytes to be read. It may not
1374
be invoked until finished_writing() has been called - this is to ensure
1375
a message-based approach to requests, for compatability with message
1376
based mediums like HTTP.
1378
if self._state == "writing":
1379
raise errors.WritingNotComplete(self)
1380
if self._state != "reading":
1381
raise errors.ReadingCompleted(self)
1382
return self._read_bytes(count)
1384
def _read_bytes(self, count):
1385
"""Helper for read_bytes.
1387
read_bytes checks the state of the request to determing if bytes
1388
should be read. After that it hands off to _read_bytes to do the
1391
raise NotImplementedError(self._read_bytes)
1394
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1395
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1397
def __init__(self, medium):
1398
SmartClientMediumRequest.__init__(self, medium)
1399
# check that we are safe concurrency wise. If some streams start
1400
# allowing concurrent requests - i.e. via multiplexing - then this
1401
# assert should be moved to SmartClientStreamMedium.get_request,
1402
# and the setting/unsetting of _current_request likewise moved into
1403
# that class : but its unneeded overhead for now. RBC 20060922
1404
if self._medium._current_request is not None:
1405
raise errors.TooManyConcurrentRequests(self._medium)
1406
self._medium._current_request = self
1408
def _accept_bytes(self, bytes):
1409
"""See SmartClientMediumRequest._accept_bytes.
1411
This forwards to self._medium._accept_bytes because we are operating
1412
on the mediums stream.
1414
self._medium._accept_bytes(bytes)
1416
def _finished_reading(self):
1417
"""See SmartClientMediumRequest._finished_reading.
1419
This clears the _current_request on self._medium to allow a new
1420
request to be created.
1422
assert self._medium._current_request is self
1423
self._medium._current_request = None
1425
def _finished_writing(self):
1426
"""See SmartClientMediumRequest._finished_writing.
1428
This invokes self._medium._flush to ensure all bytes are transmitted.
1430
self._medium._flush()
1432
def _read_bytes(self, count):
1433
"""See SmartClientMediumRequest._read_bytes.
1435
This forwards to self._medium._read_bytes because we are operating
1436
on the mediums stream.
1438
return self._medium._read_bytes(count)
1441
class SmartClientRequestProtocolOne(SmartProtocolBase):
1442
"""The client-side protocol for smart version 1."""
1444
def __init__(self, request):
1445
"""Construct a SmartClientRequestProtocolOne.
1447
:param request: A SmartClientMediumRequest to serialise onto and
1450
self._request = request
1451
self._body_buffer = None
1453
def call(self, *args):
1454
bytes = _encode_tuple(args)
1455
self._request.accept_bytes(bytes)
1456
self._request.finished_writing()
1458
def call_with_body_bytes(self, args, body):
1459
"""Make a remote call of args with body bytes 'body'.
1461
After calling this, call read_response_tuple to find the result out.
1463
bytes = _encode_tuple(args)
1464
self._request.accept_bytes(bytes)
1465
bytes = self._encode_bulk_data(body)
1466
self._request.accept_bytes(bytes)
1467
self._request.finished_writing()
1469
def call_with_body_readv_array(self, args, body):
1470
"""Make a remote call with a readv array.
1472
The body is encoded with one line per readv offset pair. The numbers in
1473
each pair are separated by a comma, and no trailing \n is emitted.
1475
bytes = _encode_tuple(args)
1476
self._request.accept_bytes(bytes)
1477
readv_bytes = self._serialise_offsets(body)
1478
bytes = self._encode_bulk_data(readv_bytes)
1479
self._request.accept_bytes(bytes)
1480
self._request.finished_writing()
1482
def cancel_read_body(self):
1483
"""After expecting a body, a response code may indicate one otherwise.
1485
This method lets the domain client inform the protocol that no body
1486
will be transmitted. This is a terminal method: after calling it the
1487
protocol is not able to be used further.
1489
self._request.finished_reading()
1491
def read_response_tuple(self, expect_body=False):
1492
"""Read a response tuple from the wire.
1494
This should only be called once.
1496
result = self._recv_tuple()
1498
self._request.finished_reading()
1501
def read_body_bytes(self, count=-1):
1502
"""Read bytes from the body, decoding into a byte stream.
1504
We read all bytes at once to ensure we've checked the trailer for
1505
errors, and then feed the buffer back as read_body_bytes is called.
1507
if self._body_buffer is not None:
1508
return self._body_buffer.read(count)
1509
_body_decoder = LengthPrefixedBodyDecoder()
1511
while not _body_decoder.finished_reading:
1512
bytes_wanted = _body_decoder.next_read_size()
1513
bytes = self._request.read_bytes(bytes_wanted)
1514
_body_decoder.accept_bytes(bytes)
1515
self._request.finished_reading()
1516
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1517
# XXX: TODO check the trailer result.
1518
return self._body_buffer.read(count)
1520
def _recv_tuple(self):
1521
"""Receive a tuple from the medium request."""
1523
while not line or line[-1] != '\n':
1524
# TODO: this is inefficient - but tuples are short.
1525
new_char = self._request.read_bytes(1)
1527
assert new_char != '', "end of file reading from server."
1528
return _decode_tuple(line)
1530
def query_version(self):
1531
"""Return protocol version number of the server."""
1533
resp = self.read_response_tuple()
1534
if resp == ('ok', '1'):
1537
raise errors.SmartProtocolError("bad response %r" % (resp,))
1540
class SmartClientMedium(object):
1541
"""Smart client is a medium for sending smart protocol requests over."""
1543
def disconnect(self):
1544
"""If this medium maintains a persistent connection, close it.
1546
The default implementation does nothing.
1550
class SmartClientStreamMedium(SmartClientMedium):
1551
"""Stream based medium common class.
1553
SmartClientStreamMediums operate on a stream. All subclasses use a common
1554
SmartClientStreamMediumRequest for their requests, and should implement
1555
_accept_bytes and _read_bytes to allow the request objects to send and
1560
self._current_request = None
1562
def accept_bytes(self, bytes):
1563
self._accept_bytes(bytes)
1566
"""The SmartClientStreamMedium knows how to close the stream when it is
1572
"""Flush the output stream.
1574
This method is used by the SmartClientStreamMediumRequest to ensure that
1575
all data for a request is sent, to avoid long timeouts or deadlocks.
1577
raise NotImplementedError(self._flush)
1579
def get_request(self):
1580
"""See SmartClientMedium.get_request().
1582
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1585
return SmartClientStreamMediumRequest(self)
1587
def read_bytes(self, count):
1588
return self._read_bytes(count)
1591
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1592
"""A client medium using simple pipes.
1594
This client does not manage the pipes: it assumes they will always be open.
1597
def __init__(self, readable_pipe, writeable_pipe):
1598
SmartClientStreamMedium.__init__(self)
1599
self._readable_pipe = readable_pipe
1600
self._writeable_pipe = writeable_pipe
1602
def _accept_bytes(self, bytes):
1603
"""See SmartClientStreamMedium.accept_bytes."""
1604
self._writeable_pipe.write(bytes)
1607
"""See SmartClientStreamMedium._flush()."""
1608
self._writeable_pipe.flush()
1610
def _read_bytes(self, count):
1611
"""See SmartClientStreamMedium._read_bytes."""
1612
return self._readable_pipe.read(count)
1615
class SmartSSHClientMedium(SmartClientStreamMedium):
1616
"""A client medium using SSH."""
1618
def __init__(self, host, port=None, username=None, password=None,
1620
"""Creates a client that will connect on the first use.
1622
:param vendor: An optional override for the ssh vendor to use. See
1623
bzrlib.transport.ssh for details on ssh vendors.
1625
SmartClientStreamMedium.__init__(self)
1626
self._connected = False
1628
self._password = password
1630
self._username = username
1631
self._read_from = None
1632
self._ssh_connection = None
1633
self._vendor = vendor
1634
self._write_to = None
1636
def _accept_bytes(self, bytes):
1637
"""See SmartClientStreamMedium.accept_bytes."""
1638
self._ensure_connection()
1639
self._write_to.write(bytes)
1641
def disconnect(self):
1642
"""See SmartClientMedium.disconnect()."""
1643
if not self._connected:
1645
self._read_from.close()
1646
self._write_to.close()
1647
self._ssh_connection.close()
1648
self._connected = False
1650
def _ensure_connection(self):
1651
"""Connect this medium if not already connected."""
1654
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1655
if self._vendor is None:
1656
vendor = ssh._get_ssh_vendor()
1658
vendor = self._vendor
1659
self._ssh_connection = vendor.connect_ssh(self._username,
1660
self._password, self._host, self._port,
1661
command=[executable, 'serve', '--inet', '--directory=/',
1663
self._read_from, self._write_to = \
1664
self._ssh_connection.get_filelike_channels()
1665
self._connected = True
1668
"""See SmartClientStreamMedium._flush()."""
1669
self._write_to.flush()
1671
def _read_bytes(self, count):
1672
"""See SmartClientStreamMedium.read_bytes."""
1673
if not self._connected:
1674
raise errors.MediumNotConnected(self)
1675
return self._read_from.read(count)
1678
class SmartTCPClientMedium(SmartClientStreamMedium):
1679
"""A client medium using TCP."""
1681
def __init__(self, host, port):
1682
"""Creates a client that will connect on the first use."""
1683
SmartClientStreamMedium.__init__(self)
1684
self._connected = False
1689
def _accept_bytes(self, bytes):
1690
"""See SmartClientMedium.accept_bytes."""
1691
self._ensure_connection()
1692
self._socket.sendall(bytes)
1694
def disconnect(self):
1695
"""See SmartClientMedium.disconnect()."""
1696
if not self._connected:
1698
self._socket.close()
1700
self._connected = False
1702
def _ensure_connection(self):
1703
"""Connect this medium if not already connected."""
1706
self._socket = socket.socket()
1707
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1708
result = self._socket.connect_ex((self._host, int(self._port)))
1710
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1711
(self._host, self._port, os.strerror(result)))
1712
self._connected = True
1715
"""See SmartClientStreamMedium._flush().
1717
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1718
add a means to do a flush, but that can be done in the future.
1721
def _read_bytes(self, count):
1722
"""See SmartClientMedium.read_bytes."""
1723
if not self._connected:
1724
raise errors.MediumNotConnected(self)
1725
return self._socket.recv(count)
1728
class SmartTCPTransport(SmartTransport):
1729
"""Connection to smart server over plain tcp.
1731
This is essentially just a factory to get 'RemoteTransport(url,
1732
SmartTCPClientMedium).
1735
def __init__(self, url):
1736
_scheme, _username, _password, _host, _port, _path = \
1737
transport.split_url(url)
1739
_port = BZR_DEFAULT_PORT
1743
except (ValueError, TypeError), e:
1744
raise errors.InvalidURL(
1745
path=url, extra="invalid port %s" % _port)
1746
medium = SmartTCPClientMedium(_host, _port)
1747
super(SmartTCPTransport, self).__init__(url, medium=medium)
1750
class SmartSSHTransport(SmartTransport):
1751
"""Connection to smart server over SSH.
1753
This is essentially just a factory to get 'RemoteTransport(url,
1754
SmartSSHClientMedium).
1757
def __init__(self, url):
1758
_scheme, _username, _password, _host, _port, _path = \
1759
transport.split_url(url)
1761
if _port is not None:
1763
except (ValueError, TypeError), e:
1764
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1766
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1767
super(SmartSSHTransport, self).__init__(url, medium=medium)
1770
class SmartHTTPTransport(SmartTransport):
1771
"""Just a way to connect between a bzr+http:// url and http://.
1773
This connection operates slightly differently than the SmartSSHTransport.
1774
It uses a plain http:// transport underneath, which defines what remote
1775
.bzr/smart URL we are connected to. From there, all paths that are sent are
1776
sent as relative paths, this way, the remote side can properly
1777
de-reference them, since it is likely doing rewrite rules to translate an
1778
HTTP path into a local path.
1781
def __init__(self, url, http_transport=None):
1782
assert url.startswith('bzr+http://')
1784
if http_transport is None:
1785
http_url = url[len('bzr+'):]
1786
self._http_transport = transport.get_transport(http_url)
1788
self._http_transport = http_transport
1789
http_medium = self._http_transport.get_smart_medium()
1790
super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
1792
def _remote_path(self, relpath):
1793
"""After connecting HTTP Transport only deals in relative URLs."""
1799
def abspath(self, relpath):
1800
"""Return the full url to the given relative path.
1802
:param relpath: the relative path or path components
1803
:type relpath: str or list
1805
return self._unparse_url(self._combine_paths(self._path, relpath))
1807
def clone(self, relative_url):
1808
"""Make a new SmartHTTPTransport related to me.
1810
This is re-implemented rather than using the default
1811
SmartTransport.clone() because we must be careful about the underlying
1815
abs_url = self.abspath(relative_url)
1818
# By cloning the underlying http_transport, we are able to share the
1820
new_transport = self._http_transport.clone(relative_url)
1821
return SmartHTTPTransport(abs_url, http_transport=new_transport)
1824
def get_test_permutations():
1825
"""Return (transport, server) permutations for testing."""
1826
### We may need a little more test framework support to construct an
1827
### appropriate RemoteTransport in the future.
1828
return [(SmartTCPTransport, SmartTCPServer_for_testing)]