1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the
51
idea that you have multiple requests and get have a read error because the
52
other side did shutdown sd send. For pipes we have read pipe which will have a
53
zero read which marks end-of-file. For HTTP server environment there is not
54
end-of-stream because each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialisation, deserialisation) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialisation, deserialisation) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
197
# the socket, and it assumes everything is UTF8 sections separated
198
# by \001. Which means a request like '\002' Will abort the connection
199
# because of a UnicodeDecodeError. It does look like invalid data will
200
# kill the SmartServerStreamMedium, but only with an abort + exception, and
201
# the overall server shouldn't die.
203
from cStringIO import StringIO
219
from bzrlib.bundle.serializer import write_bundle
221
# must do this otherwise urllib can't parse the urls properly :(
222
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
223
transport.register_urlparse_netloc_protocol(scheme)
227
def _recv_tuple(from_file):
228
req_line = from_file.readline()
229
return _decode_tuple(req_line)
232
def _decode_tuple(req_line):
233
if req_line == None or req_line == '':
235
if req_line[-1] != '\n':
236
raise errors.SmartProtocolError("request %r not terminated" % req_line)
237
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
240
def _encode_tuple(args):
241
"""Encode the tuple args to a bytestream."""
242
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
245
class SmartProtocolBase(object):
246
"""Methods common to client and server"""
248
# TODO: this only actually accomodates a single block; possibly should support
250
def _recv_bulk(self):
251
# This is OBSOLETE except for the double handline in the server:
252
# the read_bulk + reencode noise.
253
chunk_len = self._in.readline()
255
chunk_len = int(chunk_len)
257
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
258
bulk = self._in.read(chunk_len)
259
if len(bulk) != chunk_len:
260
raise errors.SmartProtocolError("short read fetching bulk data chunk")
264
def _recv_trailer(self):
265
resp = self._recv_tuple()
266
if resp == ('done', ):
269
self._translate_error(resp)
271
def _encode_bulk_data(self, body):
272
"""Encode body as a bulk data chunk."""
273
return ''.join(('%d\n' % len(body), body, 'done\n'))
275
def _serialise_offsets(self, offsets):
276
"""Serialise a readv offset list."""
278
for start, length in offsets:
279
txt.append('%d,%d' % (start, length))
280
return '\n'.join(txt)
282
def _send_bulk_data(self, body, a_file=None):
283
"""Send chunked body data"""
284
assert isinstance(body, str)
285
bytes = self._encode_bulk_data(body)
286
self._write_and_flush(bytes, a_file)
288
def _write_and_flush(self, bytes, a_file=None):
289
"""Write bytes to self._out and flush it."""
290
# XXX: this will be inefficient. Just ask Robert.
297
class SmartServerRequestProtocolOne(SmartProtocolBase):
298
"""Server-side encoding and decoding logic for smart version 1."""
300
def __init__(self, output_stream, backing_transport):
301
self._out_stream = output_stream
302
self._backing_transport = backing_transport
303
self.excess_buffer = ''
304
self.finished_reading = False
306
self.has_dispatched = False
308
self._body_decoder = None
310
def accept_bytes(self, bytes):
311
"""Take bytes, and advance the internal state machine appropriately.
313
:param bytes: must be a byte string
315
assert isinstance(bytes, str)
316
self.in_buffer += bytes
317
if not self.has_dispatched:
318
if '\n' not in self.in_buffer:
319
# no command line yet
321
self.has_dispatched = True
322
# XXX if in_buffer not \n-terminated this will do the wrong
325
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
327
req_args = _decode_tuple(first_line)
328
self.request = SmartServerRequestHandler(
329
self._backing_transport)
330
self.request.dispatch_command(req_args[0], req_args[1:])
331
if self.request.finished_reading:
333
self.excess_buffer = self.in_buffer
335
self._send_response(self.request.response.args,
336
self.request.response.body)
337
self.sync_with_request(self.request)
339
except KeyboardInterrupt:
341
except Exception, exception:
342
# everything else: pass to client, flush, and quit
343
self._send_response(('error', str(exception)))
347
if self.finished_reading:
348
# nothing to do.XXX: this routine should be a single state
350
self.excess_buffer += self.in_buffer
353
if self._body_decoder is None:
354
self._body_decoder = LengthPrefixedBodyDecoder()
355
self._body_decoder.accept_bytes(self.in_buffer)
356
self.in_buffer = self._body_decoder.unused_data
357
body_data = self._body_decoder.read_pending_data()
358
self.request.accept_body(body_data)
359
if self._body_decoder.finished_reading:
360
self.request.end_of_body()
361
assert self.request.finished_reading, \
362
"no more body, request not finished"
363
self.sync_with_request(self.request)
364
if self.request.response is not None:
365
self._send_response(self.request.response.args,
366
self.request.response.body)
367
self.excess_buffer = self.in_buffer
370
assert not self.request.finished_reading, \
371
"no response and we have finished reading."
373
def _send_response(self, args, body=None):
374
"""Send a smart server response down the output stream."""
375
self._out_stream.write(_encode_tuple(args))
377
self._out_stream.flush()
379
self._send_bulk_data(body, self._out_stream)
380
#self._out_stream.write('BLARGH')
382
def sync_with_request(self, request):
383
self.finished_reading = request.finished_reading
386
class LengthPrefixedBodyDecoder(object):
387
"""Decodes the length-prefixed bulk data."""
390
self.bytes_left = None
391
self.finished_reading = False
392
self.unused_data = ''
393
self.state_accept = self._state_accept_expecting_length
394
self.state_read = self._state_read_no_data
396
self._trailer_buffer = ''
398
def accept_bytes(self, bytes):
399
"""Decode as much of bytes as possible.
401
If 'bytes' contains too much data it will be appended to
404
finished_reading will be set when no more data is required. Further
405
data will be appended to self.unused_data.
407
# accept_bytes is allowed to change the state
408
current_state = self.state_accept
409
self.state_accept(bytes)
410
while current_state != self.state_accept:
411
current_state = self.state_accept
412
self.state_accept('')
414
def next_read_size(self):
415
if self.bytes_left is not None:
416
# Ideally we want to read all the remainder of the body and the
418
return self.bytes_left + 5
419
elif self.state_accept == self._state_accept_reading_trailer:
420
# Just the trailer left
421
return 5 - len(self._trailer_buffer)
422
elif self.state_accept == self._state_accept_expecting_length:
423
# There's still at least 6 bytes left ('\n' to end the length, plus
427
# Reading excess data. Either way, 1 byte at a time is fine.
430
def read_pending_data(self):
431
"""Return any pending data that has been decoded."""
432
return self.state_read()
434
def _state_accept_expecting_length(self, bytes):
435
self._in_buffer += bytes
436
pos = self._in_buffer.find('\n')
439
self.bytes_left = int(self._in_buffer[:pos])
440
self._in_buffer = self._in_buffer[pos+1:]
441
self.bytes_left -= len(self._in_buffer)
442
self.state_accept = self._state_accept_reading_body
443
self.state_read = self._state_read_in_buffer
445
def _state_accept_reading_body(self, bytes):
446
self._in_buffer += bytes
447
self.bytes_left -= len(bytes)
448
if self.bytes_left <= 0:
450
if self.bytes_left != 0:
451
self._trailer_buffer = self._in_buffer[self.bytes_left:]
452
self._in_buffer = self._in_buffer[:self.bytes_left]
453
self.bytes_left = None
454
self.state_accept = self._state_accept_reading_trailer
456
def _state_accept_reading_trailer(self, bytes):
457
self._trailer_buffer += bytes
458
# TODO: what if the trailer does not match "done\n"? Should this raise
459
# a ProtocolViolation exception?
460
if self._trailer_buffer.startswith('done\n'):
461
self.unused_data = self._trailer_buffer[len('done\n'):]
462
self.state_accept = self._state_accept_reading_unused
463
self.finished_reading = True
465
def _state_accept_reading_unused(self, bytes):
466
self.unused_data += bytes
468
def _state_read_no_data(self):
471
def _state_read_in_buffer(self):
472
result = self._in_buffer
477
class SmartServerStreamMedium(SmartProtocolBase):
478
"""Handles smart commands coming over a stream.
480
The stream may be a pipe connected to sshd, or a tcp socket, or an
481
in-process fifo for testing.
483
One instance is created for each connected client; it can serve multiple
484
requests in the lifetime of the connection.
486
The server passes requests through to an underlying backing transport,
487
which will typically be a LocalTransport looking at the server's filesystem.
490
def __init__(self, in_file, out_file, backing_transport):
491
"""Construct new server.
493
:param in_file: Python file from which requests can be read.
494
:param out_file: Python file to write responses.
495
:param backing_transport: Transport for the directory served.
499
self.backing_transport = backing_transport
501
def _recv_tuple(self):
502
"""Read a request from the client and return as a tuple.
504
Returns None at end of file (if the client closed the connection.)
506
# ** Deserialise and read bytes
507
return _recv_tuple(self._in)
509
def _send_tuple(self, args):
510
"""Send response header"""
511
# ** serialise and write bytes
512
return self._write_and_flush(_encode_tuple(args))
514
def _send_error_and_disconnect(self, exception):
515
# ** serialise and write bytes
516
self._send_tuple(('error', str(exception)))
520
def _serve_one_request(self):
521
"""Read one request from input, process, send back a response.
523
:return: False if the server should terminate, otherwise None.
525
# ** deserialise, read bytes, serialise and write bytes
526
req_line = self._in.readline()
527
# this should just test "req_line == ''", surely? -- Andrew Bennetts
528
if req_line in ('', None):
529
# client closed connection
530
return False # shutdown server
532
protocol = SmartServerRequestProtocolOne(self._out,
533
self.backing_transport)
534
protocol.accept_bytes(req_line)
535
if not protocol.finished_reading:
536
# this boils down to readline which wont block on open sockets
537
# without data. We should really though read as much as is
538
# available and then hand to that accept_bytes without this
539
# silly double-decode.
540
bulk = self._recv_bulk()
541
bulk_bytes = ''.join(('%d\n' % len(bulk), bulk, 'done\n'))
542
protocol.accept_bytes(bulk_bytes)
543
# might be nice to do protocol.end_of_bytes()
544
# because self._recv_bulk reads all the bytes, this must finish
545
# after one delivery of data rather than looping.
546
assert protocol.finished_reading
547
except KeyboardInterrupt:
550
# everything else: pass to client, flush, and quit
551
self._send_error_and_disconnect(e)
555
"""Serve requests until the client disconnects."""
556
# Keep a reference to stderr because the sys module's globals get set to
557
# None during interpreter shutdown.
558
from sys import stderr
560
while self._serve_one_request() != False:
563
stderr.write("%s terminating on exception %s\n" % (self, e))
567
class SmartServerResponse(object):
568
"""Response generated by SmartServerRequestHandler."""
570
def __init__(self, args, body=None):
574
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
575
# for delivering the data for a request. This could be done with as the
576
# StreamServer, though that would create conflation between request and response
577
# which may be undesirable.
580
class SmartServerRequestHandler(object):
581
"""Protocol logic for smart server.
583
This doesn't handle serialization at all, it just processes requests and
587
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
588
# not contain encoding or decoding logic to allow the wire protocol to vary
589
# from the object protocol: we will want to tweak the wire protocol separate
590
# from the object model, and ideally we will be able to do that without
591
# having a SmartServerRequestHandler subclass for each wire protocol, rather
592
# just a Protocol subclass.
594
# TODO: Better way of representing the body for commands that take it,
595
# and allow it to be streamed into the server.
597
def __init__(self, backing_transport):
598
self._backing_transport = backing_transport
599
self._converted_command = False
600
self.finished_reading = False
601
self._body_bytes = ''
604
def accept_body(self, bytes):
607
This should be overriden for each command that desired body data to
608
handle the right format of that data. I.e. plain bytes, a bundle etc.
610
The deserialisation into that format should be done in the Protocol
611
object. Set self.desired_body_format to the format your method will
614
# default fallback is to accumulate bytes.
615
self._body_bytes += bytes
617
def _end_of_body_handler(self):
618
"""An unimplemented end of body handler."""
619
raise NotImplementedError(self._end_of_body_handler)
622
"""Answer a version request with my version."""
623
return SmartServerResponse(('ok', '1'))
625
def do_has(self, relpath):
626
r = self._backing_transport.has(relpath) and 'yes' or 'no'
627
return SmartServerResponse((r,))
629
def do_get(self, relpath):
630
backing_bytes = self._backing_transport.get_bytes(relpath)
631
return SmartServerResponse(('ok',), backing_bytes)
633
def _deserialise_optional_mode(self, mode):
634
# XXX: FIXME this should be on the protocol object.
640
def do_append(self, relpath, mode):
641
self._converted_command = True
642
self._relpath = relpath
643
self._mode = self._deserialise_optional_mode(mode)
644
self._end_of_body_handler = self._handle_do_append_end
646
def _handle_do_append_end(self):
647
old_length = self._backing_transport.append_bytes(
648
self._relpath, self._body_bytes, self._mode)
649
self.response = SmartServerResponse(('appended', '%d' % old_length))
651
def do_delete(self, relpath):
652
self._backing_transport.delete(relpath)
654
def do_iter_files_recursive(self, abspath):
655
# XXX: the path handling needs some thought.
656
#relpath = self._backing_transport.relpath(abspath)
657
transport = self._backing_transport.clone(abspath)
658
filenames = transport.iter_files_recursive()
659
return SmartServerResponse(('names',) + tuple(filenames))
661
def do_list_dir(self, relpath):
662
filenames = self._backing_transport.list_dir(relpath)
663
return SmartServerResponse(('names',) + tuple(filenames))
665
def do_mkdir(self, relpath, mode):
666
self._backing_transport.mkdir(relpath,
667
self._deserialise_optional_mode(mode))
669
def do_move(self, rel_from, rel_to):
670
self._backing_transport.move(rel_from, rel_to)
672
def do_put(self, relpath, mode):
673
self._converted_command = True
674
self._relpath = relpath
675
self._mode = self._deserialise_optional_mode(mode)
676
self._end_of_body_handler = self._handle_do_put
678
def _handle_do_put(self):
679
self._backing_transport.put_bytes(self._relpath,
680
self._body_bytes, self._mode)
681
self.response = SmartServerResponse(('ok',))
683
def _deserialise_offsets(self, text):
684
# XXX: FIXME this should be on the protocol object.
686
for line in text.split('\n'):
689
start, length = line.split(',')
690
offsets.append((int(start), int(length)))
693
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
694
self._converted_command = True
695
self._end_of_body_handler = self._handle_put_non_atomic
696
self._relpath = relpath
697
self._dir_mode = self._deserialise_optional_mode(dir_mode)
698
self._mode = self._deserialise_optional_mode(mode)
699
# a boolean would be nicer XXX
700
self._create_parent = (create_parent == 'T')
702
def _handle_put_non_atomic(self):
703
self._backing_transport.put_bytes_non_atomic(self._relpath,
706
create_parent_dir=self._create_parent,
707
dir_mode=self._dir_mode)
708
self.response = SmartServerResponse(('ok',))
710
def do_readv(self, relpath):
711
self._converted_command = True
712
self._end_of_body_handler = self._handle_readv_offsets
713
self._relpath = relpath
715
def end_of_body(self):
716
"""No more body data will be received."""
717
self._run_handler_code(self._end_of_body_handler, (), {})
718
# cannot read after this.
719
self.finished_reading = True
721
def _handle_readv_offsets(self):
722
"""accept offsets for a readv request."""
723
offsets = self._deserialise_offsets(self._body_bytes)
724
backing_bytes = ''.join(bytes for offset, bytes in
725
self._backing_transport.readv(self._relpath, offsets))
726
self.response = SmartServerResponse(('readv',), backing_bytes)
728
def do_rename(self, rel_from, rel_to):
729
self._backing_transport.rename(rel_from, rel_to)
731
def do_rmdir(self, relpath):
732
self._backing_transport.rmdir(relpath)
734
def do_stat(self, relpath):
735
stat = self._backing_transport.stat(relpath)
736
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
738
def do_get_bundle(self, path, revision_id):
739
# open transport relative to our base
740
t = self._backing_transport.clone(path)
741
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
742
repo = control.open_repository()
743
tmpf = tempfile.TemporaryFile()
744
base_revision = revision.NULL_REVISION
745
write_bundle(repo, revision_id, base_revision, tmpf)
747
return SmartServerResponse((), tmpf.read())
749
def dispatch_command(self, cmd, args):
750
"""Deprecated compatibility method.""" # XXX XXX
751
func = getattr(self, 'do_' + cmd, None)
753
raise errors.SmartProtocolError("bad request %r" % (cmd,))
754
self._run_handler_code(func, args, {})
756
def _run_handler_code(self, callable, args, kwargs):
757
"""Run some handler specific code 'callable'.
759
If a result is returned, it is considered to be the commands response,
760
and finished_reading is set true, and its assigned to self.response.
762
Any exceptions caught are translated and a response object created
765
result = self._call_converting_errors(callable, args, kwargs)
766
if result is not None:
767
self.response = result
768
self.finished_reading = True
769
# handle unconverted commands
770
if not self._converted_command:
771
self.finished_reading = True
773
self.response = SmartServerResponse(('ok',))
775
def _call_converting_errors(self, callable, args, kwargs):
776
"""Call callable converting errors to Response objects."""
778
return callable(*args, **kwargs)
779
except errors.NoSuchFile, e:
780
return SmartServerResponse(('NoSuchFile', e.path))
781
except errors.FileExists, e:
782
return SmartServerResponse(('FileExists', e.path))
783
except errors.DirectoryNotEmpty, e:
784
return SmartServerResponse(('DirectoryNotEmpty', e.path))
785
except errors.ShortReadvError, e:
786
return SmartServerResponse(('ShortReadvError',
787
e.path, str(e.offset), str(e.length), str(e.actual)))
788
except UnicodeError, e:
789
# If it is a DecodeError, than most likely we are starting
790
# with a plain string
791
str_or_unicode = e.object
792
if isinstance(str_or_unicode, unicode):
793
val = u'u:' + str_or_unicode
795
val = u's:' + str_or_unicode.encode('base64')
796
# This handles UnicodeEncodeError or UnicodeDecodeError
797
return SmartServerResponse((e.__class__.__name__,
798
e.encoding, val, str(e.start), str(e.end), e.reason))
799
except errors.TransportNotPossible, e:
800
if e.msg == "readonly transport":
801
return SmartServerResponse(('ReadOnlyError', ))
806
class SmartTCPServer(object):
807
"""Listens on a TCP socket and accepts connections from smart clients"""
809
def __init__(self, backing_transport, host='127.0.0.1', port=0):
810
"""Construct a new server.
812
To actually start it running, call either start_background_thread or
815
:param host: Name of the interface to listen on.
816
:param port: TCP port to listen on, or 0 to allocate a transient port.
818
self._server_socket = socket.socket()
819
self._server_socket.bind((host, port))
820
self.port = self._server_socket.getsockname()[1]
821
self._server_socket.listen(1)
822
self._server_socket.settimeout(1)
823
self.backing_transport = backing_transport
826
# let connections timeout so that we get a chance to terminate
827
# Keep a reference to the exceptions we want to catch because the socket
828
# module's globals get set to None during interpreter shutdown.
829
from socket import timeout as socket_timeout
830
from socket import error as socket_error
831
self._should_terminate = False
832
while not self._should_terminate:
834
self.accept_and_serve()
835
except socket_timeout:
836
# just check if we're asked to stop
838
except socket_error, e:
839
trace.warning("client disconnected: %s", e)
843
"""Return the url of the server"""
844
return "bzr://%s:%d/" % self._server_socket.getsockname()
846
def accept_and_serve(self):
847
conn, client_addr = self._server_socket.accept()
848
# For WIN32, where the timeout value from the listening socket
849
# propogates to the newly accepted socket.
850
conn.setblocking(True)
851
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
852
from_client = conn.makefile('r')
853
to_client = conn.makefile('w')
854
handler = SmartServerStreamMedium(from_client, to_client,
855
self.backing_transport)
856
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
857
connection_thread.setDaemon(True)
858
connection_thread.start()
860
def start_background_thread(self):
861
self._server_thread = threading.Thread(None,
863
name='server-' + self.get_url())
864
self._server_thread.setDaemon(True)
865
self._server_thread.start()
867
def stop_background_thread(self):
868
self._should_terminate = True
869
# self._server_socket.close()
870
# we used to join the thread, but it's not really necessary; it will
872
## self._server_thread.join()
875
class SmartTCPServer_for_testing(SmartTCPServer):
876
"""Server suitable for use by transport tests.
878
This server is backed by the process's cwd.
882
self._homedir = os.getcwd()
883
# The server is set up by default like for ssh access: the client
884
# passes filesystem-absolute paths; therefore the server must look
885
# them up relative to the root directory. it might be better to act
886
# a public server and have the server rewrite paths into the test
888
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
891
"""Set up server for testing"""
892
self.start_background_thread()
895
self.stop_background_thread()
898
"""Return the url of the server"""
899
host, port = self._server_socket.getsockname()
900
# XXX: I think this is likely to break on windows -- self._homedir will
901
# have backslashes (and maybe a drive letter?).
902
# -- Andrew Bennetts, 2006-08-29
903
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
905
def get_bogus_url(self):
906
"""Return a URL which will fail to connect"""
907
return 'bzr://127.0.0.1:1/'
910
class SmartStat(object):
912
def __init__(self, size, mode):
917
class SmartTransport(transport.Transport):
918
"""Connection to a smart server.
920
The connection holds references to pipes that can be used to send requests
923
The connection has a notion of the current directory to which it's
924
connected; this is incorporated in filenames passed to the server.
926
This supports some higher-level RPC operations and can also be treated
927
like a Transport to do file-like operations.
929
The connection can be made over a tcp socket, or (in future) an ssh pipe
930
or a series of http requests. There are concrete subclasses for each
931
type: SmartTCPTransport, etc.
934
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
935
# responsibilities: Put those on SmartClient or similar. This is vital for
936
# the ability to support multiple versions of the smart protocol over time:
937
# SmartTransport is an adapter from the Transport object model to the
938
# SmartClient model, not an encoder.
940
def __init__(self, url, clone_from=None, medium=None):
943
:param medium: The medium to use for this RemoteTransport. This must be
944
supplied if clone_from is None.
946
### Technically super() here is faulty because Transport's __init__
947
### fails to take 2 parameters, and if super were to choose a silly
948
### initialisation order things would blow up.
949
if not url.endswith('/'):
951
super(SmartTransport, self).__init__(url)
952
self._scheme, self._username, self._password, self._host, self._port, self._path = \
953
transport.split_url(url)
954
if clone_from is None:
955
self._medium = medium
957
# credentials may be stripped from the base in some circumstances
958
# as yet to be clearly defined or documented, so copy them.
959
self._username = clone_from._username
960
# reuse same connection
961
self._medium = clone_from._medium
962
assert self._medium is not None
964
def abspath(self, relpath):
965
"""Return the full url to the given relative path.
967
@param relpath: the relative path or path components
968
@type relpath: str or list
970
return self._unparse_url(self._remote_path(relpath))
972
def clone(self, relative_url):
973
"""Make a new SmartTransport related to me, sharing the same connection.
975
This essentially opens a handle on a different remote directory.
977
if relative_url is None:
978
return SmartTransport(self.base, self)
980
return SmartTransport(self.abspath(relative_url), self)
982
def is_readonly(self):
983
"""Smart server transport can do read/write file operations."""
986
def get_smart_client(self):
989
def get_smart_medium(self):
992
def _unparse_url(self, path):
993
"""Return URL for a path.
995
:see: SFTPUrlHandling._unparse_url
997
# TODO: Eventually it should be possible to unify this with
998
# SFTPUrlHandling._unparse_url?
1001
path = urllib.quote(path)
1002
netloc = urllib.quote(self._host)
1003
if self._username is not None:
1004
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
1005
if self._port is not None:
1006
netloc = '%s:%d' % (netloc, self._port)
1007
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
1009
def _remote_path(self, relpath):
1010
"""Returns the Unicode version of the absolute path for relpath."""
1011
return self._combine_paths(self._path, relpath)
1013
def _call(self, method, *args):
1014
resp = self._call2(method, *args)
1015
self._translate_error(resp)
1017
def _call2(self, method, *args):
1018
"""Call a method on the remote server."""
1019
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1020
protocol.call(method, *args)
1021
return protocol.read_response_tuple()
1023
def _call_with_body_bytes(self, method, args, body):
1024
"""Call a method on the remote server with body bytes."""
1025
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1026
protocol.call_with_body_bytes((method, ) + args, body)
1027
return protocol.read_response_tuple()
1029
def has(self, relpath):
1030
"""Indicate whether a remote file of the given name exists or not.
1032
:see: Transport.has()
1034
resp = self._call2('has', self._remote_path(relpath))
1035
if resp == ('yes', ):
1037
elif resp == ('no', ):
1040
self._translate_error(resp)
1042
def get(self, relpath):
1043
"""Return file-like object reading the contents of a remote file.
1045
:see: Transport.get_bytes()/get_file()
1047
return StringIO(self.get_bytes(relpath))
1049
def get_bytes(self, relpath):
1050
remote = self._remote_path(relpath)
1051
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1052
protocol.call('get', remote)
1053
resp = protocol.read_response_tuple(True)
1054
if resp != ('ok', ):
1055
protocol.cancel_read_body()
1056
self._translate_error(resp, relpath)
1057
return protocol.read_body_bytes()
1059
def _serialise_optional_mode(self, mode):
1065
def mkdir(self, relpath, mode=None):
1066
resp = self._call2('mkdir', self._remote_path(relpath),
1067
self._serialise_optional_mode(mode))
1068
self._translate_error(resp)
1070
def put_bytes(self, relpath, upload_contents, mode=None):
1071
# FIXME: upload_file is probably not safe for non-ascii characters -
1072
# should probably just pass all parameters as length-delimited
1074
resp = self._call_with_body_bytes('put',
1075
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1077
self._translate_error(resp)
1079
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1080
create_parent_dir=False,
1082
"""See Transport.put_bytes_non_atomic."""
1083
# FIXME: no encoding in the transport!
1084
create_parent_str = 'F'
1085
if create_parent_dir:
1086
create_parent_str = 'T'
1088
resp = self._call_with_body_bytes(
1090
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1091
create_parent_str, self._serialise_optional_mode(dir_mode)),
1093
self._translate_error(resp)
1095
def put_file(self, relpath, upload_file, mode=None):
1096
# its not ideal to seek back, but currently put_non_atomic_file depends
1097
# on transports not reading before failing - which is a faulty
1098
# assumption I think - RBC 20060915
1099
pos = upload_file.tell()
1101
return self.put_bytes(relpath, upload_file.read(), mode)
1103
upload_file.seek(pos)
1106
def put_file_non_atomic(self, relpath, f, mode=None,
1107
create_parent_dir=False,
1109
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1110
create_parent_dir=create_parent_dir,
1113
def append_file(self, relpath, from_file, mode=None):
1114
return self.append_bytes(relpath, from_file.read(), mode)
1116
def append_bytes(self, relpath, bytes, mode=None):
1117
resp = self._call_with_body_bytes(
1119
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1121
if resp[0] == 'appended':
1123
self._translate_error(resp)
1125
def delete(self, relpath):
1126
resp = self._call2('delete', self._remote_path(relpath))
1127
self._translate_error(resp)
1129
def readv(self, relpath, offsets):
1133
offsets = list(offsets)
1135
sorted_offsets = sorted(offsets)
1136
# turn the list of offsets into a stack
1137
offset_stack = iter(offsets)
1138
cur_offset_and_size = offset_stack.next()
1139
coalesced = list(self._coalesce_offsets(sorted_offsets,
1140
limit=self._max_readv_combine,
1141
fudge_factor=self._bytes_to_read_before_seek))
1143
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1144
protocol.call_with_body_readv_array(
1145
('readv', self._remote_path(relpath)),
1146
[(c.start, c.length) for c in coalesced])
1147
resp = protocol.read_response_tuple(True)
1149
if resp[0] != 'readv':
1150
# This should raise an exception
1151
protocol.cancel_read_body()
1152
self._translate_error(resp)
1155
# FIXME: this should know how many bytes are needed, for clarity.
1156
data = protocol.read_body_bytes()
1157
# Cache the results, but only until they have been fulfilled
1159
for c_offset in coalesced:
1160
if len(data) < c_offset.length:
1161
raise errors.ShortReadvError(relpath, c_offset.start,
1162
c_offset.length, actual=len(data))
1163
for suboffset, subsize in c_offset.ranges:
1164
key = (c_offset.start+suboffset, subsize)
1165
data_map[key] = data[suboffset:suboffset+subsize]
1166
data = data[c_offset.length:]
1168
# Now that we've read some data, see if we can yield anything back
1169
while cur_offset_and_size in data_map:
1170
this_data = data_map.pop(cur_offset_and_size)
1171
yield cur_offset_and_size[0], this_data
1172
cur_offset_and_size = offset_stack.next()
1174
def rename(self, rel_from, rel_to):
1175
self._call('rename',
1176
self._remote_path(rel_from),
1177
self._remote_path(rel_to))
1179
def move(self, rel_from, rel_to):
1181
self._remote_path(rel_from),
1182
self._remote_path(rel_to))
1184
def rmdir(self, relpath):
1185
resp = self._call('rmdir', self._remote_path(relpath))
1187
def _translate_error(self, resp, orig_path=None):
1188
"""Raise an exception from a response"""
1195
elif what == 'NoSuchFile':
1196
if orig_path is not None:
1197
error_path = orig_path
1199
error_path = resp[1]
1200
raise errors.NoSuchFile(error_path)
1201
elif what == 'error':
1202
raise errors.SmartProtocolError(unicode(resp[1]))
1203
elif what == 'FileExists':
1204
raise errors.FileExists(resp[1])
1205
elif what == 'DirectoryNotEmpty':
1206
raise errors.DirectoryNotEmpty(resp[1])
1207
elif what == 'ShortReadvError':
1208
raise errors.ShortReadvError(resp[1], int(resp[2]),
1209
int(resp[3]), int(resp[4]))
1210
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1211
encoding = str(resp[1]) # encoding must always be a string
1213
start = int(resp[3])
1215
reason = str(resp[5]) # reason must always be a string
1216
if val.startswith('u:'):
1218
elif val.startswith('s:'):
1219
val = val[2:].decode('base64')
1220
if what == 'UnicodeDecodeError':
1221
raise UnicodeDecodeError(encoding, val, start, end, reason)
1222
elif what == 'UnicodeEncodeError':
1223
raise UnicodeEncodeError(encoding, val, start, end, reason)
1224
elif what == "ReadOnlyError":
1225
raise errors.TransportNotPossible('readonly transport')
1227
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1229
def disconnect(self):
1230
self._medium.disconnect()
1232
def delete_tree(self, relpath):
1233
raise errors.TransportNotPossible('readonly transport')
1235
def stat(self, relpath):
1236
resp = self._call2('stat', self._remote_path(relpath))
1237
if resp[0] == 'stat':
1238
return SmartStat(int(resp[1]), int(resp[2], 8))
1240
self._translate_error(resp)
1242
## def lock_read(self, relpath):
1243
## """Lock the given file for shared (read) access.
1244
## :return: A lock object, which should be passed to Transport.unlock()
1246
## # The old RemoteBranch ignore lock for reading, so we will
1247
## # continue that tradition and return a bogus lock object.
1248
## class BogusLock(object):
1249
## def __init__(self, path):
1251
## def unlock(self):
1253
## return BogusLock(relpath)
1258
def list_dir(self, relpath):
1259
resp = self._call2('list_dir', self._remote_path(relpath))
1260
if resp[0] == 'names':
1261
return [name.encode('ascii') for name in resp[1:]]
1263
self._translate_error(resp)
1265
def iter_files_recursive(self):
1266
resp = self._call2('iter_files_recursive', self._remote_path(''))
1267
if resp[0] == 'names':
1270
self._translate_error(resp)
1273
class SmartClientMediumRequest(object):
1274
"""A request on a SmartClientMedium.
1276
Each request allows bytes to be provided to it via accept_bytes, and then
1277
the response bytes to be read via read_bytes.
1280
request.accept_bytes('123')
1281
request.finished_writing()
1282
result = request.read_bytes(3)
1283
request.finished_reading()
1285
It is up to the individual SmartClientMedium whether multiple concurrent
1286
requests can exist. See SmartClientMedium.get_request to obtain instances
1287
of SmartClientMediumRequest, and the concrete Medium you are using for
1288
details on concurrency and pipelining.
1291
def __init__(self, medium):
1292
"""Construct a SmartClientMediumRequest for the medium medium."""
1293
self._medium = medium
1294
# we track state by constants - we may want to use the same
1295
# pattern as BodyReader if it gets more complex.
1296
# valid states are: "writing", "reading", "done"
1297
self._state = "writing"
1299
def accept_bytes(self, bytes):
1300
"""Accept bytes for inclusion in this request.
1302
This method may not be be called after finished_writing() has been
1303
called. It depends upon the Medium whether or not the bytes will be
1304
immediately transmitted. Message based Mediums will tend to buffer the
1305
bytes until finished_writing() is called.
1307
:param bytes: A bytestring.
1309
if self._state != "writing":
1310
raise errors.WritingCompleted(self)
1311
self._accept_bytes(bytes)
1313
def _accept_bytes(self, bytes):
1314
"""Helper for accept_bytes.
1316
Accept_bytes checks the state of the request to determing if bytes
1317
should be accepted. After that it hands off to _accept_bytes to do the
1320
raise NotImplementedError(self._accept_bytes)
1322
def finished_reading(self):
1323
"""Inform the request that all desired data has been read.
1325
This will remove the request from the pipeline for its medium (if the
1326
medium supports pipelining) and any further calls to methods on the
1327
request will raise ReadingCompleted.
1329
if self._state == "writing":
1330
raise errors.WritingNotComplete(self)
1331
if self._state != "reading":
1332
raise errors.ReadingCompleted(self)
1333
self._state = "done"
1334
self._finished_reading()
1336
def _finished_reading(self):
1337
"""Helper for finished_reading.
1339
finished_reading checks the state of the request to determine if
1340
finished_reading is allowed, and if it is hands off to _finished_reading
1341
to perform the action.
1343
raise NotImplementedError(self._finished_reading)
1345
def finished_writing(self):
1346
"""Finish the writing phase of this request.
1348
This will flush all pending data for this request along the medium.
1349
After calling finished_writing, you may not call accept_bytes anymore.
1351
if self._state != "writing":
1352
raise errors.WritingCompleted(self)
1353
self._state = "reading"
1354
self._finished_writing()
1356
def _finished_writing(self):
1357
"""Helper for finished_writing.
1359
finished_writing checks the state of the request to determine if
1360
finished_writing is allowed, and if it is hands off to _finished_writing
1361
to perform the action.
1363
raise NotImplementedError(self._finished_writing)
1365
def read_bytes(self, count):
1366
"""Read bytes from this requests response.
1368
This method will block and wait for count bytes to be read. It may not
1369
be invoked until finished_writing() has been called - this is to ensure
1370
a message-based approach to requests, for compatability with message
1371
based mediums like HTTP.
1373
if self._state == "writing":
1374
raise errors.WritingNotComplete(self)
1375
if self._state != "reading":
1376
raise errors.ReadingCompleted(self)
1377
return self._read_bytes(count)
1379
def _read_bytes(self, count):
1380
"""Helper for read_bytes.
1382
read_bytes checks the state of the request to determing if bytes
1383
should be read. After that it hands off to _read_bytes to do the
1386
raise NotImplementedError(self._read_bytes)
1389
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1390
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1392
def __init__(self, medium):
1393
SmartClientMediumRequest.__init__(self, medium)
1394
# check that we are safe concurrency wise. If some streams start
1395
# allowing concurrent requests - i.e. via multiplexing - then this
1396
# assert should be moved to SmartClientStreamMedium.get_request,
1397
# and the setting/unsetting of _current_request likewise moved into
1398
# that class : but its unneeded overhead for now. RBC 20060922
1399
if self._medium._current_request is not None:
1400
raise errors.TooManyConcurrentRequests(self._medium)
1401
self._medium._current_request = self
1403
def _accept_bytes(self, bytes):
1404
"""See SmartClientMediumRequest._accept_bytes.
1406
This forwards to self._medium._accept_bytes because we are operating
1407
on the mediums stream.
1409
self._medium._accept_bytes(bytes)
1411
def _finished_reading(self):
1412
"""See SmartClientMediumRequest._finished_reading.
1414
This clears the _current_request on self._medium to allow a new
1415
request to be created.
1417
assert self._medium._current_request is self
1418
self._medium._current_request = None
1420
def _finished_writing(self):
1421
"""See SmartClientMediumRequest._finished_writing.
1423
This invokes self._medium._flush to ensure all bytes are transmitted.
1425
self._medium._flush()
1427
def _read_bytes(self, count):
1428
"""See SmartClientMediumRequest._read_bytes.
1430
This forwards to self._medium._read_bytes because we are operating
1431
on the mediums stream.
1433
return self._medium._read_bytes(count)
1436
class SmartClientRequestProtocolOne(SmartProtocolBase):
1437
"""The client-side protocol for smart version 1."""
1439
def __init__(self, request):
1440
"""Construct a SmartClientRequestProtocolOne.
1442
:param request: A SmartClientMediumRequest to serialise onto and
1445
self._request = request
1446
self._body_buffer = None
1448
def call(self, *args):
1449
bytes = _encode_tuple(args)
1450
self._request.accept_bytes(bytes)
1451
self._request.finished_writing()
1453
def call_with_body_bytes(self, args, body):
1454
"""Make a remote call of args with body bytes 'body'.
1456
After calling this, call read_response_tuple to find the result out.
1458
bytes = _encode_tuple(args)
1459
self._request.accept_bytes(bytes)
1460
bytes = self._encode_bulk_data(body)
1461
self._request.accept_bytes(bytes)
1462
self._request.finished_writing()
1464
def call_with_body_readv_array(self, args, body):
1465
"""Make a remote call with a readv array.
1467
The body is encoded with one line per readv offset pair. The numbers in
1468
each pair are separated by a comma, and no trailing \n is emitted.
1470
bytes = _encode_tuple(args)
1471
self._request.accept_bytes(bytes)
1472
readv_bytes = self._serialise_offsets(body)
1473
bytes = self._encode_bulk_data(readv_bytes)
1474
self._request.accept_bytes(bytes)
1475
self._request.finished_writing()
1477
def cancel_read_body(self):
1478
"""After expecting a body, a response code may indicate one otherwise.
1480
This method lets the domain client inform the protocol that no body
1481
will be transmitted. This is a terminal method: after calling it the
1482
protocol is not able to be used further.
1484
self._request.finished_reading()
1486
def read_response_tuple(self, expect_body=False):
1487
"""Read a response tuple from the wire.
1489
This should only be called once.
1491
result = self._recv_tuple()
1493
self._request.finished_reading()
1496
def read_body_bytes(self, count=-1):
1497
"""Read bytes from the body, decoding into a byte stream.
1499
We read all bytes at once to ensure we've checked the trailer for
1500
errors, and then feed the buffer back as read_body_bytes is called.
1502
if self._body_buffer is not None:
1503
return self._body_buffer.read(count)
1504
_body_decoder = LengthPrefixedBodyDecoder()
1506
while not _body_decoder.finished_reading:
1507
bytes_wanted = _body_decoder.bytes_left
1508
if bytes_wanted is None:
1510
bytes = self._request.read_bytes(bytes_wanted)
1511
_body_decoder.accept_bytes(bytes)
1512
self._request.finished_reading()
1513
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1514
# XXX: TODO check the trailer result.
1515
return self._body_buffer.read(count)
1517
def _recv_tuple(self):
1518
"""Receive a tuple from the medium request."""
1520
while not line or line[-1] != '\n':
1521
# yes, this is inefficient - but tuples are short.
1522
new_char = self._request.read_bytes(1)
1524
assert new_char != '', "end of file reading from server."
1525
return _decode_tuple(line)
1527
def query_version(self):
1528
"""Return protocol version number of the server."""
1530
resp = self.read_response_tuple()
1531
if resp == ('ok', '1'):
1534
raise errors.SmartProtocolError("bad response %r" % (resp,))
1537
class SmartClientMedium(object):
1538
"""Smart client is a medium for sending smart protocol requests over."""
1540
def disconnect(self):
1541
"""If this medium maintains a persistent connection, close it.
1543
The default implementation does nothing.
1547
class SmartClientStreamMedium(SmartClientMedium):
1548
"""Stream based medium common class.
1550
SmartClientStreamMediums operate on a stream. All subclasses use a common
1551
SmartClientStreamMediumRequest for their requests, and should implement
1552
_accept_bytes and _read_bytes to allow the request objects to send and
1557
self._current_request = None
1559
def accept_bytes(self, bytes):
1560
self._accept_bytes(bytes)
1563
"""The SmartClientStreamMedium knows how to close the stream when it is
1569
"""Flush the output stream.
1571
This method is used by the SmartClientStreamMediumRequest to ensure that
1572
all data for a request is sent, to avoid long timeouts or deadlocks.
1574
raise NotImplementedError(self._flush)
1576
def get_request(self):
1577
"""See SmartClientMedium.get_request().
1579
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1582
return SmartClientStreamMediumRequest(self)
1584
def read_bytes(self, count):
1585
return self._read_bytes(count)
1588
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1589
"""A client medium using simple pipes.
1591
This client does not manage the pipes: it assumes they will always be open.
1594
def __init__(self, readable_pipe, writeable_pipe):
1595
SmartClientStreamMedium.__init__(self)
1596
self._readable_pipe = readable_pipe
1597
self._writeable_pipe = writeable_pipe
1599
def _accept_bytes(self, bytes):
1600
"""See SmartClientStreamMedium.accept_bytes."""
1601
self._writeable_pipe.write(bytes)
1604
"""See SmartClientStreamMedium._flush()."""
1605
self._writeable_pipe.flush()
1607
def _read_bytes(self, count):
1608
"""See SmartClientStreamMedium._read_bytes."""
1609
return self._readable_pipe.read(count)
1612
class SmartSSHClientMedium(SmartClientStreamMedium):
1613
"""A client medium using SSH."""
1615
def __init__(self, host, port=None, username=None, password=None,
1617
"""Creates a client that will connect on the first use.
1619
:param vendor: An optional override for the ssh vendor to use. See
1620
bzrlib.transport.ssh for details on ssh vendors.
1622
SmartClientStreamMedium.__init__(self)
1623
self._connected = False
1625
self._password = password
1627
self._username = username
1628
self._read_from = None
1629
self._ssh_connection = None
1630
self._vendor = vendor
1631
self._write_to = None
1633
def _accept_bytes(self, bytes):
1634
"""See SmartClientStreamMedium.accept_bytes."""
1635
self._ensure_connection()
1636
self._write_to.write(bytes)
1638
def disconnect(self):
1639
"""See SmartClientMedium.disconnect()."""
1640
if not self._connected:
1642
self._read_from.close()
1643
self._write_to.close()
1644
self._ssh_connection.close()
1645
self._connected = False
1647
def _ensure_connection(self):
1648
"""Connect this medium if not already connected."""
1651
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1652
if self._vendor is None:
1653
vendor = ssh._get_ssh_vendor()
1655
vendor = self._vendor
1656
self._ssh_connection = vendor.connect_ssh(self._username,
1657
self._password, self._host, self._port,
1658
command=[executable, 'serve', '--inet', '--directory=/',
1660
self._read_from, self._write_to = \
1661
self._ssh_connection.get_filelike_channels()
1662
self._connected = True
1665
"""See SmartClientStreamMedium._flush()."""
1666
self._write_to.flush()
1668
def _read_bytes(self, count):
1669
"""See SmartClientStreamMedium.read_bytes."""
1670
if not self._connected:
1671
raise errors.MediumNotConnected(self)
1672
return self._read_from.read(count)
1675
class SmartTCPClientMedium(SmartClientStreamMedium):
1676
"""A client medium using TCP."""
1678
def __init__(self, host, port):
1679
"""Creates a client that will connect on the first use."""
1680
SmartClientStreamMedium.__init__(self)
1681
self._connected = False
1686
def _accept_bytes(self, bytes):
1687
"""See SmartClientMedium.accept_bytes."""
1688
self._ensure_connection()
1689
self._socket.sendall(bytes)
1691
def disconnect(self):
1692
"""See SmartClientMedium.disconnect()."""
1693
if not self._connected:
1695
self._socket.close()
1697
self._connected = False
1699
def _ensure_connection(self):
1700
"""Connect this medium if not already connected."""
1703
self._socket = socket.socket()
1704
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1705
result = self._socket.connect_ex((self._host, int(self._port)))
1707
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1708
(self._host, self._port, os.strerror(result)))
1709
self._connected = True
1712
"""See SmartClientStreamMedium._flush().
1714
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1715
add a means to do a flush, but that can be done in the future.
1718
def _read_bytes(self, count):
1719
"""See SmartClientMedium.read_bytes."""
1720
if not self._connected:
1721
raise errors.MediumNotConnected(self)
1722
return self._socket.recv(count)
1725
class SmartTCPTransport(SmartTransport):
1726
"""Connection to smart server over plain tcp.
1728
This is essentially just a factory to get 'RemoteTransport(url,
1729
SmartTCPClientMedium).
1732
def __init__(self, url):
1733
_scheme, _username, _password, _host, _port, _path = \
1734
transport.split_url(url)
1737
except (ValueError, TypeError), e:
1738
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1739
medium = SmartTCPClientMedium(_host, _port)
1740
super(SmartTCPTransport, self).__init__(url, medium=medium)
1744
from bzrlib.transport import ssh
1745
except errors.ParamikoNotPresent:
1746
# no paramiko, no SSHTransport.
1749
class SmartSSHTransport(SmartTransport):
1750
"""Connection to smart server over SSH.
1752
This is essentially just a factory to get 'RemoteTransport(url,
1753
SmartSSHClientMedium).
1756
def __init__(self, url):
1757
_scheme, _username, _password, _host, _port, _path = \
1758
transport.split_url(url)
1760
if _port is not None:
1762
except (ValueError, TypeError), e:
1763
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1765
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1766
super(SmartSSHTransport, self).__init__(url, medium=medium)
1769
def get_test_permutations():
1770
"""Return (transport, server) permutations for testing."""
1771
### We may need a little more test framework support to construct an
1772
### appropriate RemoteTransport in the future.
1773
return [(SmartTCPTransport, SmartTCPServer_for_testing)]