1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the
51
idea that you have multiple requests and get have a read error because the
52
other side did shutdown sd send. For pipes we have read pipe which will have a
53
zero read which marks end-of-file. For HTTP server environment there is not
54
end-of-stream because each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialisation, deserialisation) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialisation, deserialisation) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
197
# the socket, and it assumes everything is UTF8 sections separated
198
# by \001. Which means a request like '\002' Will abort the connection
199
# because of a UnicodeDecodeError. It does look like invalid data will
200
# kill the SmartServerStreamMedium, but only with an abort + exception, and
201
# the overall server shouldn't die.
203
from cStringIO import StringIO
221
from bzrlib.bundle.serializer import write_bundle
222
from bzrlib.trace import mutter
223
from bzrlib.transport import local
225
# must do this otherwise urllib can't parse the urls properly :(
226
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
227
transport.register_urlparse_netloc_protocol(scheme)
231
def _recv_tuple(from_file):
232
req_line = from_file.readline()
233
return _decode_tuple(req_line)
236
def _decode_tuple(req_line):
237
if req_line == None or req_line == '':
239
if req_line[-1] != '\n':
240
raise errors.SmartProtocolError("request %r not terminated" % req_line)
241
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
244
def _encode_tuple(args):
245
"""Encode the tuple args to a bytestream."""
246
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
249
class SmartProtocolBase(object):
250
"""Methods common to client and server"""
252
def _send_bulk_data(self, body, a_file=None):
253
"""Send chunked body data"""
254
assert isinstance(body, str)
255
bytes = self._encode_bulk_data(body)
256
self._write_and_flush(bytes, a_file)
258
def _encode_bulk_data(self, body):
259
"""Encode body as a bulk data chunk."""
260
return ''.join(('%d\n' % len(body), body, 'done\n'))
262
# TODO: this only actually accomodates a single block; possibly should support
264
def _recv_bulk(self):
265
chunk_len = self._in.readline()
267
chunk_len = int(chunk_len)
269
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
270
bulk = self._in.read(chunk_len)
271
if len(bulk) != chunk_len:
272
raise errors.SmartProtocolError("short read fetching bulk data chunk")
276
def _recv_tuple(self):
277
return _recv_tuple(self._in)
279
def _recv_trailer(self):
280
resp = self._recv_tuple()
281
if resp == ('done', ):
284
self._translate_error(resp)
286
def _serialise_offsets(self, offsets):
287
"""Serialise a readv offset list."""
289
for start, length in offsets:
290
txt.append('%d,%d' % (start, length))
291
return '\n'.join(txt)
293
def _write_and_flush(self, bytes, a_file=None):
294
"""Write bytes to self._out and flush it."""
295
# XXX: this will be inefficient. Just ask Robert.
302
class SmartServerRequestProtocolOne(SmartProtocolBase):
303
"""Server-side encoding and decoding logic for smart version 1."""
305
def __init__(self, output_stream, backing_transport):
306
self._out_stream = output_stream
307
self._backing_transport = backing_transport
308
self.finished_reading = False
310
self.has_dispatched = False
312
self._body_decoder = None
314
def accept_bytes(self, bytes):
315
"""Take bytes, and advance the internal state machine appropriately.
317
:param bytes: must be a byte string
319
assert isinstance(bytes, str)
320
self.in_buffer += bytes
321
if not self.has_dispatched:
322
if '\n' not in self.in_buffer:
323
# no command line yet
325
self.has_dispatched = True
326
# XXX if in_buffer not \n-terminated this will do the wrong
329
assert self.in_buffer.endswith('\n')
330
req_args = _decode_tuple(self.in_buffer)
332
self.request = SmartServerRequestHandler(
333
self._backing_transport)
334
self.request.dispatch_command(req_args[0], req_args[1:])
335
if self.request.finished_reading:
337
self._send_response(self.request.response.args,
338
self.request.response.body)
339
self.sync_with_request(self.request)
341
except KeyboardInterrupt:
343
except Exception, exception:
344
# everything else: pass to client, flush, and quit
345
self._send_response(('error', str(exception)))
349
if self.finished_reading:
350
# nothing to do.XXX: this routine should be a single state
353
if self._body_decoder is None:
354
self._body_decoder = LengthPrefixedBodyDecoder()
355
self._body_decoder.accept_bytes(self.in_buffer)
356
self.in_buffer = self._body_decoder.unused_data
357
body_data = self._body_decoder.read_pending_data()
358
self.request.accept_body(body_data)
359
if self._body_decoder.finished_reading:
360
self.request.end_of_body()
361
assert self.request.finished_reading, \
362
"no more body, request not finished"
363
self.sync_with_request(self.request)
364
if self.request.response is not None:
365
self._send_response(self.request.response.args,
366
self.request.response.body)
368
assert not self.request.finished_reading, \
369
"no response and we have finished reading."
371
def _send_response(self, args, body=None):
372
"""Send a smart server response down the output stream."""
373
self._out_stream.write(_encode_tuple(args))
375
self._out_stream.flush()
377
self._send_bulk_data(body, self._out_stream)
378
#self._out_stream.write('BLARGH')
380
def sync_with_request(self, request):
381
self.finished_reading = request.finished_reading
384
class LengthPrefixedBodyDecoder(object):
385
"""Decodes the length-prefixed bulk data."""
388
self.finished_reading = False
389
self.unused_data = ''
390
self.state_accept = self._state_accept_expecting_length
391
self.state_read = self._state_read_no_data
393
self._trailer_buffer = ''
395
def accept_bytes(self, bytes):
396
"""Decode as much of bytes as possible.
398
If 'bytes' contains too much data it will be appended to
401
finished_reading will be set when no more data is required. Further
402
data will be appended to self.unused_data.
404
# accept_bytes is allowed to change the state
405
current_state = self.state_accept
406
self.state_accept(bytes)
407
while current_state != self.state_accept:
408
current_state = self.state_accept
409
self.state_accept('')
411
def read_pending_data(self):
412
"""Return any pending data that has been decoded."""
413
return self.state_read()
415
def _state_accept_expecting_length(self, bytes):
416
self._in_buffer += bytes
417
pos = self._in_buffer.find('\n')
420
self.bytes_left = int(self._in_buffer[:pos])
421
self._in_buffer = self._in_buffer[pos+1:]
422
self.bytes_left -= len(self._in_buffer)
423
self.state_accept = self._state_accept_reading_body
424
self.state_read = self._state_read_in_buffer
426
def _state_accept_reading_body(self, bytes):
427
self._in_buffer += bytes
428
self.bytes_left -= len(bytes)
429
if self.bytes_left <= 0:
431
if self.bytes_left != 0:
432
self._trailer_buffer = self._in_buffer[self.bytes_left:]
433
self._in_buffer = self._in_buffer[:self.bytes_left]
434
self.state_accept = self._state_accept_reading_trailer
436
def _state_accept_reading_trailer(self, bytes):
437
self._trailer_buffer += bytes
438
if self._trailer_buffer.startswith('done\n'):
439
self.unused_data = self._trailer_buffer[len('done\n'):]
440
self.state_accept = self._state_accept_reading_unused
441
self.finished_reading = True
443
def _state_accept_reading_unused(self, bytes):
444
self.unused_data += bytes
446
def _state_read_no_data(self):
449
def _state_read_in_buffer(self):
450
result = self._in_buffer
458
class SmartServerStreamMedium(SmartProtocolBase):
459
"""Handles smart commands coming over a stream.
461
The stream may be a pipe connected to sshd, or a tcp socket, or an
462
in-process fifo for testing.
464
One instance is created for each connected client; it can serve multiple
465
requests in the lifetime of the connection.
467
The server passes requests through to an underlying backing transport,
468
which will typically be a LocalTransport looking at the server's filesystem.
471
def __init__(self, in_file, out_file, backing_transport):
472
"""Construct new server.
474
:param in_file: Python file from which requests can be read.
475
:param out_file: Python file to write responses.
476
:param backing_transport: Transport for the directory served.
480
self.backing_transport = backing_transport
482
def _recv_tuple(self):
483
"""Read a request from the client and return as a tuple.
485
Returns None at end of file (if the client closed the connection.)
487
# ** Deserialise and read bytes
488
return _recv_tuple(self._in)
490
def _send_tuple(self, args):
491
"""Send response header"""
492
# ** serialise and write bytes
493
return self._write_and_flush(_encode_tuple(args))
495
def _send_error_and_disconnect(self, exception):
496
# ** serialise and write bytes
497
self._send_tuple(('error', str(exception)))
501
def _serve_one_request(self):
502
"""Read one request from input, process, send back a response.
504
:return: False if the server should terminate, otherwise None.
506
# ** deserialise, read bytes, serialise and write bytes
507
req_line = self._in.readline()
508
# this should just test "req_line == ''", surely? -- Andrew Bennetts
509
if req_line in ('', None):
510
# client closed connection
511
return False # shutdown server
513
protocol = SmartServerRequestProtocolOne(self._out,
514
self.backing_transport)
515
protocol.accept_bytes(req_line)
516
if not protocol.finished_reading:
517
# this boils down to readline which wont block on open sockets
518
# without data. We should really though read as much as is
519
# available and then hand to that accept_bytes without this
520
# silly double-decode.
521
bulk = self._recv_bulk()
522
bulk_bytes = ''.join(('%d\n' % len(bulk), bulk, 'done\n'))
523
protocol.accept_bytes(bulk_bytes)
524
# might be nice to do protocol.end_of_bytes()
525
# because self._recv_bulk reads all the bytes, this must finish
526
# after one delivery of data rather than looping.
527
assert protocol.finished_reading
528
except KeyboardInterrupt:
531
# everything else: pass to client, flush, and quit
532
self._send_error_and_disconnect(e)
536
"""Serve requests until the client disconnects."""
537
# Keep a reference to stderr because the sys module's globals get set to
538
# None during interpreter shutdown.
539
from sys import stderr
541
while self._serve_one_request() != False:
544
stderr.write("%s terminating on exception %s\n" % (self, e))
548
class SmartServerResponse(object):
549
"""Response generated by SmartServerRequestHandler."""
551
def __init__(self, args, body=None):
555
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
556
# for delivering the data for a request. This could be done with as the
557
# StreamServer, though that would create conflation between request and response
558
# which may be undesirable.
561
class SmartServerRequestHandler(object):
562
"""Protocol logic for smart server.
564
This doesn't handle serialization at all, it just processes requests and
568
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
569
# not contain encoding or decoding logic to allow the wire protocol to vary
570
# from the object protocol: we will want to tweak the wire protocol separate
571
# from the object model, and ideally we will be able to do that without
572
# having a SmartServerRequestHandler subclass for each wire protocol, rather
573
# just a Protocol subclass.
575
# TODO: Better way of representing the body for commands that take it,
576
# and allow it to be streamed into the server.
578
def __init__(self, backing_transport):
579
self._backing_transport = backing_transport
580
self._converted_command = False
581
self.finished_reading = False
582
self._body_bytes = ''
585
def accept_body(self, bytes):
588
This should be overriden for each command that desired body data to
589
handle the right format of that data. I.e. plain bytes, a bundle etc.
591
The deserialisation into that format should be done in the Protocol
592
object. Set self.desired_body_format to the format your method will
595
# default fallback is to accumulate bytes.
596
self._body_bytes += bytes
598
def _end_of_body_handler(self):
599
"""An unimplemented end of body handler."""
600
raise NotImplementedError(self._end_of_body_handler)
603
"""Answer a version request with my version."""
604
return SmartServerResponse(('ok', '1'))
606
def do_has(self, relpath):
607
r = self._backing_transport.has(relpath) and 'yes' or 'no'
608
return SmartServerResponse((r,))
610
def do_get(self, relpath):
611
backing_bytes = self._backing_transport.get_bytes(relpath)
612
return SmartServerResponse(('ok',), backing_bytes)
614
def _deserialise_optional_mode(self, mode):
615
# XXX: FIXME this should be on the protocol object.
621
def do_append(self, relpath, mode):
622
self._converted_command = True
623
self._relpath = relpath
624
self._mode = self._deserialise_optional_mode(mode)
625
self._end_of_body_handler = self._handle_do_append_end
627
def _handle_do_append_end(self):
628
old_length = self._backing_transport.append_bytes(
629
self._relpath, self._body_bytes, self._mode)
630
self.response = SmartServerResponse(('appended', '%d' % old_length))
632
def do_delete(self, relpath):
633
self._backing_transport.delete(relpath)
635
def do_iter_files_recursive(self, abspath):
636
# XXX: the path handling needs some thought.
637
#relpath = self._backing_transport.relpath(abspath)
638
transport = self._backing_transport.clone(abspath)
639
filenames = transport.iter_files_recursive()
640
return SmartServerResponse(('names',) + tuple(filenames))
642
def do_list_dir(self, relpath):
643
filenames = self._backing_transport.list_dir(relpath)
644
return SmartServerResponse(('names',) + tuple(filenames))
646
def do_mkdir(self, relpath, mode):
647
self._backing_transport.mkdir(relpath,
648
self._deserialise_optional_mode(mode))
650
def do_move(self, rel_from, rel_to):
651
self._backing_transport.move(rel_from, rel_to)
653
def do_put(self, relpath, mode):
654
self._converted_command = True
655
self._relpath = relpath
656
self._mode = self._deserialise_optional_mode(mode)
657
self._end_of_body_handler = self._handle_do_put
659
def _handle_do_put(self):
660
self._backing_transport.put_bytes(self._relpath,
661
self._body_bytes, self._mode)
662
self.response = SmartServerResponse(('ok',))
664
def _deserialise_offsets(self, text):
665
# XXX: FIXME this should be on the protocol object.
667
for line in text.split('\n'):
670
start, length = line.split(',')
671
offsets.append((int(start), int(length)))
674
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
675
self._converted_command = True
676
self._end_of_body_handler = self._handle_put_non_atomic
677
self._relpath = relpath
678
self._dir_mode = self._deserialise_optional_mode(dir_mode)
679
self._mode = self._deserialise_optional_mode(mode)
680
# a boolean would be nicer XXX
681
self._create_parent = (create_parent == 'T')
683
def _handle_put_non_atomic(self):
684
self._backing_transport.put_bytes_non_atomic(self._relpath,
687
create_parent_dir=self._create_parent,
688
dir_mode=self._dir_mode)
689
self.response = SmartServerResponse(('ok',))
691
def do_readv(self, relpath):
692
self._converted_command = True
693
self._end_of_body_handler = self._handle_readv_offsets
694
self._relpath = relpath
696
def end_of_body(self):
697
"""No more body data will be received."""
698
self._run_handler_code(self._end_of_body_handler, (), {})
699
# cannot read after this.
700
self.finished_reading = True
702
def _handle_readv_offsets(self):
703
"""accept offsets for a readv request."""
704
offsets = self._deserialise_offsets(self._body_bytes)
705
backing_bytes = ''.join(bytes for offset, bytes in
706
self._backing_transport.readv(self._relpath, offsets))
707
self.response = SmartServerResponse(('readv',), backing_bytes)
709
def do_rename(self, rel_from, rel_to):
710
self._backing_transport.rename(rel_from, rel_to)
712
def do_rmdir(self, relpath):
713
self._backing_transport.rmdir(relpath)
715
def do_stat(self, relpath):
716
stat = self._backing_transport.stat(relpath)
717
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
719
def do_get_bundle(self, path, revision_id):
720
# open transport relative to our base
721
t = self._backing_transport.clone(path)
722
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
723
repo = control.open_repository()
724
tmpf = tempfile.TemporaryFile()
725
base_revision = revision.NULL_REVISION
726
write_bundle(repo, revision_id, base_revision, tmpf)
728
return SmartServerResponse((), tmpf.read())
730
def dispatch_command(self, cmd, args):
731
"""Deprecated compatibility method.""" # XXX XXX
732
func = getattr(self, 'do_' + cmd, None)
734
raise errors.SmartProtocolError("bad request %r" % (cmd,))
735
self._run_handler_code(func, args, {})
737
def _run_handler_code(self, callable, args, kwargs):
738
"""Run some handler specific code 'callable'.
740
If a result is returned, it is considered to be the commands response,
741
and finished_reading is set true, and its assigned to self.response.
743
Any exceptions caught are translated and a response object created
746
result = self._call_converting_errors(callable, args, kwargs)
747
if result is not None:
748
self.response = result
749
self.finished_reading = True
750
# handle unconverted commands
751
if not self._converted_command:
752
self.finished_reading = True
754
self.response = SmartServerResponse(('ok',))
756
def _call_converting_errors(self, callable, args, kwargs):
757
"""Call callable converting errors to Response objects."""
759
return callable(*args, **kwargs)
760
except errors.NoSuchFile, e:
761
return SmartServerResponse(('NoSuchFile', e.path))
762
except errors.FileExists, e:
763
return SmartServerResponse(('FileExists', e.path))
764
except errors.DirectoryNotEmpty, e:
765
return SmartServerResponse(('DirectoryNotEmpty', e.path))
766
except errors.ShortReadvError, e:
767
return SmartServerResponse(('ShortReadvError',
768
e.path, str(e.offset), str(e.length), str(e.actual)))
769
except UnicodeError, e:
770
# If it is a DecodeError, than most likely we are starting
771
# with a plain string
772
str_or_unicode = e.object
773
if isinstance(str_or_unicode, unicode):
774
val = u'u:' + str_or_unicode
776
val = u's:' + str_or_unicode.encode('base64')
777
# This handles UnicodeEncodeError or UnicodeDecodeError
778
return SmartServerResponse((e.__class__.__name__,
779
e.encoding, val, str(e.start), str(e.end), e.reason))
780
except errors.TransportNotPossible, e:
781
if e.msg == "readonly transport":
782
return SmartServerResponse(('ReadOnlyError', ))
787
class SmartTCPServer(object):
788
"""Listens on a TCP socket and accepts connections from smart clients"""
790
def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
791
"""Construct a new server.
793
To actually start it running, call either start_background_thread or
796
:param host: Name of the interface to listen on.
797
:param port: TCP port to listen on, or 0 to allocate a transient port.
799
if backing_transport is None:
800
backing_transport = memory.MemoryTransport()
801
self._server_socket = socket.socket()
802
self._server_socket.bind((host, port))
803
self.port = self._server_socket.getsockname()[1]
804
self._server_socket.listen(1)
805
self._server_socket.settimeout(1)
806
self.backing_transport = backing_transport
809
# let connections timeout so that we get a chance to terminate
810
# Keep a reference to the exceptions we want to catch because the socket
811
# module's globals get set to None during interpreter shutdown.
812
from socket import timeout as socket_timeout
813
from socket import error as socket_error
814
self._should_terminate = False
815
while not self._should_terminate:
817
self.accept_and_serve()
818
except socket_timeout:
819
# just check if we're asked to stop
821
except socket_error, e:
822
trace.warning("client disconnected: %s", e)
826
"""Return the url of the server"""
827
return "bzr://%s:%d/" % self._server_socket.getsockname()
829
def accept_and_serve(self):
830
conn, client_addr = self._server_socket.accept()
831
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
832
from_client = conn.makefile('r')
833
to_client = conn.makefile('w')
834
handler = SmartServerStreamMedium(from_client, to_client,
835
self.backing_transport)
836
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
837
connection_thread.setDaemon(True)
838
connection_thread.start()
840
def start_background_thread(self):
841
self._server_thread = threading.Thread(None,
843
name='server-' + self.get_url())
844
self._server_thread.setDaemon(True)
845
self._server_thread.start()
847
def stop_background_thread(self):
848
self._should_terminate = True
849
# self._server_socket.close()
850
# we used to join the thread, but it's not really necessary; it will
852
## self._server_thread.join()
855
class SmartTCPServer_for_testing(SmartTCPServer):
856
"""Server suitable for use by transport tests.
858
This server is backed by the process's cwd.
862
self._homedir = os.getcwd()
863
# The server is set up by default like for ssh access: the client
864
# passes filesystem-absolute paths; therefore the server must look
865
# them up relative to the root directory. it might be better to act
866
# a public server and have the server rewrite paths into the test
868
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
871
"""Set up server for testing"""
872
self.start_background_thread()
875
self.stop_background_thread()
878
"""Return the url of the server"""
879
host, port = self._server_socket.getsockname()
880
# XXX: I think this is likely to break on windows -- self._homedir will
881
# have backslashes (and maybe a drive letter?).
882
# -- Andrew Bennetts, 2006-08-29
883
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
885
def get_bogus_url(self):
886
"""Return a URL which will fail to connect"""
887
return 'bzr://127.0.0.1:1/'
890
class SmartStat(object):
892
def __init__(self, size, mode):
897
class SmartTransport(transport.Transport):
898
"""Connection to a smart server.
900
The connection holds references to pipes that can be used to send requests
903
The connection has a notion of the current directory to which it's
904
connected; this is incorporated in filenames passed to the server.
906
This supports some higher-level RPC operations and can also be treated
907
like a Transport to do file-like operations.
909
The connection can be made over a tcp socket, or (in future) an ssh pipe
910
or a series of http requests. There are concrete subclasses for each
911
type: SmartTCPTransport, etc.
914
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
915
# responsibilities: Put those on SmartClient or similar. This is vital for
916
# the ability to support multiple versions of the smart protocol over time:
917
# SmartTransport is an adapter from the Transport object model to the
918
# SmartClient model, not an encoder.
920
def __init__(self, url, clone_from=None, medium=None):
923
:param medium: The medium to use for this RemoteTransport. This must be
924
supplied if clone_from is None.
926
### Technically super() here is faulty because Transport's __init__
927
### fails to take 2 parameters, and if super were to choose a silly
928
### initialisation order things would blow up.
929
if not url.endswith('/'):
931
super(SmartTransport, self).__init__(url)
932
self._scheme, self._username, self._password, self._host, self._port, self._path = \
933
transport.split_url(url)
934
if clone_from is None:
935
self._client = medium
937
# credentials may be stripped from the base in some circumstances
938
# as yet to be clearly defined or documented, so copy them.
939
self._username = clone_from._username
940
# reuse same connection
941
self._client = clone_from._client
942
assert self._client is not None
944
def abspath(self, relpath):
945
"""Return the full url to the given relative path.
947
@param relpath: the relative path or path components
948
@type relpath: str or list
950
return self._unparse_url(self._remote_path(relpath))
952
def clone(self, relative_url):
953
"""Make a new SmartTransport related to me, sharing the same connection.
955
This essentially opens a handle on a different remote directory.
957
if relative_url is None:
958
return SmartTransport(self.base, self)
960
return SmartTransport(self.abspath(relative_url), self)
962
def is_readonly(self):
963
"""Smart server transport can do read/write file operations."""
966
def get_smart_client(self):
969
def get_smart_medium(self):
972
def _unparse_url(self, path):
973
"""Return URL for a path.
975
:see: SFTPUrlHandling._unparse_url
977
# TODO: Eventually it should be possible to unify this with
978
# SFTPUrlHandling._unparse_url?
981
path = urllib.quote(path)
982
netloc = urllib.quote(self._host)
983
if self._username is not None:
984
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
985
if self._port is not None:
986
netloc = '%s:%d' % (netloc, self._port)
987
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
989
def _remote_path(self, relpath):
990
"""Returns the Unicode version of the absolute path for relpath."""
991
return self._combine_paths(self._path, relpath)
993
def has(self, relpath):
994
"""Indicate whether a remote file of the given name exists or not.
996
:see: Transport.has()
998
resp = self._client._call('has', self._remote_path(relpath))
999
if resp == ('yes', ):
1001
elif resp == ('no', ):
1004
self._translate_error(resp)
1006
def get(self, relpath):
1007
"""Return file-like object reading the contents of a remote file.
1009
:see: Transport.get_bytes()/get_file()
1011
remote = self._remote_path(relpath)
1012
resp = self._client._call('get', remote)
1013
if resp != ('ok', ):
1014
self._translate_error(resp, relpath)
1015
return StringIO(self._client._recv_bulk())
1017
def _serialise_optional_mode(self, mode):
1023
def mkdir(self, relpath, mode=None):
1024
resp = self._client._call('mkdir',
1025
self._remote_path(relpath),
1026
self._serialise_optional_mode(mode))
1027
self._translate_error(resp)
1029
def put_bytes(self, relpath, upload_contents, mode=None):
1030
# FIXME: upload_file is probably not safe for non-ascii characters -
1031
# should probably just pass all parameters as length-delimited
1033
resp = self._client._call_with_upload(
1035
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1037
self._translate_error(resp)
1039
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1040
create_parent_dir=False,
1042
"""See Transport.put_bytes_non_atomic."""
1043
# FIXME: no encoding in the transport!
1044
create_parent_str = 'F'
1045
if create_parent_dir:
1046
create_parent_str = 'T'
1048
resp = self._client._call_with_upload(
1050
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1051
create_parent_str, self._serialise_optional_mode(dir_mode)),
1053
self._translate_error(resp)
1055
def put_file(self, relpath, upload_file, mode=None):
1056
# its not ideal to seek back, but currently put_non_atomic_file depends
1057
# on transports not reading before failing - which is a faulty
1058
# assumption I think - RBC 20060915
1059
pos = upload_file.tell()
1061
return self.put_bytes(relpath, upload_file.read(), mode)
1063
upload_file.seek(pos)
1066
def put_file_non_atomic(self, relpath, f, mode=None,
1067
create_parent_dir=False,
1069
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1070
create_parent_dir=create_parent_dir,
1073
def append_file(self, relpath, from_file, mode=None):
1074
return self.append_bytes(relpath, from_file.read(), mode)
1076
def append_bytes(self, relpath, bytes, mode=None):
1077
resp = self._client._call_with_upload(
1079
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1081
if resp[0] == 'appended':
1083
self._translate_error(resp)
1085
def delete(self, relpath):
1086
resp = self._client._call('delete', self._remote_path(relpath))
1087
self._translate_error(resp)
1089
def readv(self, relpath, offsets):
1093
offsets = list(offsets)
1095
sorted_offsets = sorted(offsets)
1096
# turn the list of offsets into a stack
1097
offset_stack = iter(offsets)
1098
cur_offset_and_size = offset_stack.next()
1099
coalesced = list(self._coalesce_offsets(sorted_offsets,
1100
limit=self._max_readv_combine,
1101
fudge_factor=self._bytes_to_read_before_seek))
1104
resp = self._client._call_with_upload(
1106
(self._remote_path(relpath),),
1107
self._client._serialise_offsets((c.start, c.length) for c in coalesced))
1109
if resp[0] != 'readv':
1110
# This should raise an exception
1111
self._translate_error(resp)
1114
data = self._client._recv_bulk()
1115
# Cache the results, but only until they have been fulfilled
1117
for c_offset in coalesced:
1118
if len(data) < c_offset.length:
1119
raise errors.ShortReadvError(relpath, c_offset.start,
1120
c_offset.length, actual=len(data))
1121
for suboffset, subsize in c_offset.ranges:
1122
key = (c_offset.start+suboffset, subsize)
1123
data_map[key] = data[suboffset:suboffset+subsize]
1124
data = data[c_offset.length:]
1126
# Now that we've read some data, see if we can yield anything back
1127
while cur_offset_and_size in data_map:
1128
this_data = data_map.pop(cur_offset_and_size)
1129
yield cur_offset_and_size[0], this_data
1130
cur_offset_and_size = offset_stack.next()
1132
def rename(self, rel_from, rel_to):
1133
self._call('rename',
1134
self._remote_path(rel_from),
1135
self._remote_path(rel_to))
1137
def move(self, rel_from, rel_to):
1139
self._remote_path(rel_from),
1140
self._remote_path(rel_to))
1142
def rmdir(self, relpath):
1143
resp = self._call('rmdir', self._remote_path(relpath))
1145
def _call(self, method, *args):
1146
resp = self._client._call(method, *args)
1147
self._translate_error(resp)
1149
def _translate_error(self, resp, orig_path=None):
1150
"""Raise an exception from a response"""
1157
elif what == 'NoSuchFile':
1158
if orig_path is not None:
1159
error_path = orig_path
1161
error_path = resp[1]
1162
raise errors.NoSuchFile(error_path)
1163
elif what == 'error':
1164
raise errors.SmartProtocolError(unicode(resp[1]))
1165
elif what == 'FileExists':
1166
raise errors.FileExists(resp[1])
1167
elif what == 'DirectoryNotEmpty':
1168
raise errors.DirectoryNotEmpty(resp[1])
1169
elif what == 'ShortReadvError':
1170
raise errors.ShortReadvError(resp[1], int(resp[2]),
1171
int(resp[3]), int(resp[4]))
1172
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1173
encoding = str(resp[1]) # encoding must always be a string
1175
start = int(resp[3])
1177
reason = str(resp[5]) # reason must always be a string
1178
if val.startswith('u:'):
1180
elif val.startswith('s:'):
1181
val = val[2:].decode('base64')
1182
if what == 'UnicodeDecodeError':
1183
raise UnicodeDecodeError(encoding, val, start, end, reason)
1184
elif what == 'UnicodeEncodeError':
1185
raise UnicodeEncodeError(encoding, val, start, end, reason)
1186
elif what == "ReadOnlyError":
1187
raise errors.TransportNotPossible('readonly transport')
1189
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1191
def _send_tuple(self, args):
1192
self._client.accept_bytes(_encode_tuple(args))
1194
def _recv_tuple(self):
1195
return self._client._recv_tuple()
1197
def disconnect(self):
1198
self._client.disconnect()
1200
def delete_tree(self, relpath):
1201
raise errors.TransportNotPossible('readonly transport')
1203
def stat(self, relpath):
1204
resp = self._client._call('stat', self._remote_path(relpath))
1205
if resp[0] == 'stat':
1206
return SmartStat(int(resp[1]), int(resp[2], 8))
1208
self._translate_error(resp)
1210
## def lock_read(self, relpath):
1211
## """Lock the given file for shared (read) access.
1212
## :return: A lock object, which should be passed to Transport.unlock()
1214
## # The old RemoteBranch ignore lock for reading, so we will
1215
## # continue that tradition and return a bogus lock object.
1216
## class BogusLock(object):
1217
## def __init__(self, path):
1219
## def unlock(self):
1221
## return BogusLock(relpath)
1226
def list_dir(self, relpath):
1227
resp = self._client._call('list_dir',
1228
self._remote_path(relpath))
1229
if resp[0] == 'names':
1230
return [name.encode('ascii') for name in resp[1:]]
1232
self._translate_error(resp)
1234
def iter_files_recursive(self):
1235
resp = self._client._call('iter_files_recursive',
1236
self._remote_path(''))
1237
if resp[0] == 'names':
1240
self._translate_error(resp)
1243
class SmartStreamClient(SmartProtocolBase):
1244
"""Connection to smart server over two streams"""
1246
def _send_bulk_data(self, body):
1247
self._ensure_connection()
1248
SmartProtocolBase._send_bulk_data(self, body)
1250
def disconnect(self):
1251
"""If this medium maintains a persistent connection, close it.
1253
The default implementation does nothing.
1256
def _call(self, *args):
1257
bytes = _encode_tuple(args)
1258
# should be self.medium.accept_bytes(bytes) XXX
1259
self.accept_bytes(bytes)
1260
return self._recv_tuple()
1262
def _call_with_upload(self, method, args, body):
1263
"""Call an rpc, supplying bulk upload data.
1265
:param method: method name to call
1266
:param args: parameter args tuple
1267
:param body: upload body as a byte string
1269
bytes = _encode_tuple((method,) + args)
1270
bytes += self._encode_bulk_data(body)
1271
# should be self.medium.accept_bytes XXX
1272
self.accept_bytes(bytes)
1273
return self._recv_tuple()
1275
def query_version(self):
1276
"""Return protocol version number of the server."""
1277
resp = self._call('hello')
1278
if resp == ('ok', '1'):
1281
raise errors.SmartProtocolError("bad response %r" % (resp,))
1284
class SmartClientMedium(SmartStreamClient):
1285
"""Smart client is a medium for sending smart protocol requests over.
1287
XXX: we want explicit finalisation
1291
class SmartStreamMediumClient(SmartClientMedium):
1292
"""The SmartStreamMediumClient knows how to close the stream when it is
1300
class SmartSimplePipesClientMedium(SmartClientMedium):
1301
"""A client medium using simple pipes.
1303
This client does not manage the pipes: it assumes they will always be open.
1306
def __init__(self, readable_pipe, writeable_pipe):
1307
self._readable_pipe = readable_pipe
1308
self._writeable_pipe = writeable_pipe
1310
def accept_bytes(self, bytes):
1311
"""See SmartClientMedium.accept_bytes."""
1312
self._writeable_pipe.write(bytes)
1314
def read_bytes(self, count):
1315
"""See SmartClientMedium.read_bytes."""
1316
return self._readable_pipe.read(count)
1318
def _recv_bulk(self):
1319
"""transitional api from 'client' to 'medium'."""
1320
self._in = self._readable_pipe
1322
return SmartClientMedium._recv_bulk(self)
1326
def _recv_tuple(self):
1327
"""transitional api from 'client' to 'medium'."""
1328
return _recv_tuple(self._readable_pipe)
1330
def _write_and_flush(self, bytes, file=None):
1331
"""Thunk from the 'client' api to the 'Medium' api."""
1333
self.accept_bytes(bytes)
1336
class SmartSSHClientMedium(SmartStreamMediumClient):
1337
"""A client medium using SSH."""
1339
def __init__(self, host, port=None, username=None, password=None,
1341
"""Creates a client that will connect on the first use.
1343
:param vendor: An optional override for the ssh vendor to use. See
1344
bzrlib.transport.ssh for details on ssh vendors.
1346
self._connected = False
1348
self._password = password
1350
self._username = username
1351
self._read_from = None
1352
self._ssh_connection = None
1353
self._vendor = vendor
1354
self._write_to = None
1356
def accept_bytes(self, bytes):
1357
"""See SmartClientMedium.accept_bytes."""
1358
self._ensure_connection()
1359
self._write_to.write(bytes)
1360
self._write_to.flush()
1362
def disconnect(self):
1363
"""See SmartClientMedium.disconnect()."""
1364
if not self._connected:
1366
self._read_from.close()
1367
self._write_to.close()
1368
self._ssh_connection.close()
1369
self._connected = False
1371
def _ensure_connection(self):
1372
"""Connect this medium if not already connected."""
1375
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1376
if self._vendor is None:
1377
vendor = ssh._get_ssh_vendor()
1379
vendor = self._vendor
1380
self._ssh_connection = vendor.connect_ssh(self._username,
1381
self._password, self._host, self._port,
1382
command=[executable, 'serve', '--inet', '--directory=/',
1384
self._read_from, self._write_to = \
1385
self._ssh_connection.get_filelike_channels()
1386
self._connected = True
1388
def read_bytes(self, count):
1389
"""See SmartClientMedium.read_bytes."""
1390
raise errors.MediumNotConnected(self)
1392
def _recv_bulk(self):
1393
"""transitional api from 'client' to 'medium'."""
1394
self._in = self._read_from
1396
return SmartStreamMediumClient._recv_bulk(self)
1400
def _recv_tuple(self):
1401
"""transitional api from 'client' to 'medium'."""
1402
return _recv_tuple(self._read_from)
1404
def _write_and_flush(self, bytes, file=None):
1405
"""Thunk from the 'client' api to the 'Medium' api."""
1407
self.accept_bytes(bytes)
1410
class SmartTCPClientMedium(SmartStreamMediumClient):
1411
"""A client medium using TCP."""
1413
def __init__(self, host, port):
1414
"""Creates a client that will connect on the first use."""
1415
self._connected = False
1420
def accept_bytes(self, bytes):
1421
"""See SmartClientMedium.accept_bytes."""
1422
self._ensure_connection()
1423
self._socket.sendall(bytes)
1425
def disconnect(self):
1426
"""See SmartClientMedium.disconnect()."""
1427
if not self._connected:
1429
self._socket.close()
1431
self._connected = False
1433
def _ensure_connection(self):
1434
"""Connect this medium if not already connected."""
1437
self._socket = socket.socket()
1438
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1439
result = self._socket.connect_ex((self._host, int(self._port)))
1441
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1442
(self._host, self._port, os.strerror(result)))
1443
self._connected = True
1445
def read_bytes(self, count):
1446
"""See SmartClientMedium.read_bytes."""
1447
raise errors.MediumNotConnected(self)
1449
def _recv_bulk(self):
1450
"""transitional api from 'client' to 'medium'."""
1451
self._in = self._socket.makefile('r', 0)
1453
return SmartStreamMediumClient._recv_bulk(self)
1457
def _recv_tuple(self):
1458
"""transitional api from 'client' to 'medium'."""
1459
return _recv_tuple(self._socket.makefile('r', 0))
1461
def _write_and_flush(self, bytes, file=None):
1462
"""Thunk from the 'client' api to the 'Medium' api."""
1464
self.accept_bytes(bytes)
1467
class SmartTCPTransport(SmartTransport):
1468
"""Connection to smart server over plain tcp.
1470
This is essentially just a factory to get 'RemoteTransport(url,
1471
SmartTCPClientMedium).
1474
def __init__(self, url):
1475
_scheme, _username, _password, _host, _port, _path = \
1476
transport.split_url(url)
1479
except (ValueError, TypeError), e:
1480
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1481
medium = SmartTCPClientMedium(_host, _port)
1482
super(SmartTCPTransport, self).__init__(url, medium=medium)
1486
from bzrlib.transport import sftp, ssh
1487
except errors.ParamikoNotPresent:
1488
# no paramiko, no SSHTransport.
1491
class SmartSSHTransport(SmartTransport):
1492
"""Connection to smart server over SSH.
1494
This is essentially just a factory to get 'RemoteTransport(url,
1495
SmartSSHClientMedium).
1498
def __init__(self, url):
1499
_scheme, _username, _password, _host, _port, _path = \
1500
transport.split_url(url)
1502
if _port is not None:
1504
except (ValueError, TypeError), e:
1505
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1507
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1508
super(SmartSSHTransport, self).__init__(url, medium=medium)
1511
def get_test_permutations():
1512
"""Return (transport, server) permutations for testing."""
1513
### We may need a little more test framework support to construct an
1514
### appropriate RemoteTransport in the future.
1515
return [(SmartTCPTransport, SmartTCPServer_for_testing)]