1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
53
# TODO: A plain integer from query_version is too simple; should give some
56
# TODO: Server should probably catch exceptions within itself and send them
57
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
58
# Also needs to somehow report protocol errors like bad requests. Need to
59
# consider how we'll handle error reporting, e.g. if we get halfway through a
60
# bulk transfer and then something goes wrong.
62
# TODO: Standard marker at start of request/response lines?
64
# TODO: Make each request and response self-validatable, e.g. with checksums.
66
# TODO: get/put objects could be changed to gradually read back the data as it
67
# comes across the network
69
# TODO: What should the server do if it hits an error and has to terminate?
71
# TODO: is it useful to allow multiple chunks in the bulk data?
73
# TODO: If we get an exception during transmission of bulk data we can't just
74
# emit the exception because it won't be seen.
75
# John proposes: I think it would be worthwhile to have a header on each
76
# chunk, that indicates it is another chunk. Then you can send an 'error'
77
# chunk as long as you finish the previous chunk.
79
# TODO: Clone method on Transport; should work up towards parent directory;
80
# unclear how this should be stored or communicated to the server... maybe
81
# just pass it on all relevant requests?
83
# TODO: Better name than clone() for changing between directories. How about
84
# open_dir or change_dir or chdir?
86
# TODO: Is it really good to have the notion of current directory within the
87
# connection? Perhaps all Transports should factor out a common connection
88
# from the thing that has the directory context?
90
# TODO: Pull more things common to sftp and ssh to a higher level.
92
# TODO: The server that manages a connection should be quite small and retain
93
# minimum state because each of the requests are supposed to be stateless.
94
# Then we can write another implementation that maps to http.
96
# TODO: What to do when a client connection is garbage collected? Maybe just
97
# abruptly drop the connection?
99
# TODO: Server in some cases will need to restrict access to files outside of
100
# a particular root directory. LocalTransport doesn't do anything to stop you
101
# ascending above the base directory, so we need to prevent paths
102
# containing '..' in either the server or transport layers. (Also need to
103
# consider what happens if someone creates a symlink pointing outside the
106
# TODO: Server should rebase absolute paths coming across the network to put
107
# them under the virtual root, if one is in use. LocalTransport currently
108
# doesn't do that; if you give it an absolute path it just uses it.
110
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
111
# urlescape them instead. Indeed possibly this should just literally be
114
# FIXME: This transport, with several others, has imperfect handling of paths
115
# within urls. It'd probably be better for ".." from a root to raise an error
116
# rather than return the same directory as we do at present.
118
# TODO: Rather than working at the Transport layer we want a Branch,
119
# Repository or BzrDir objects that talk to a server.
121
# TODO: Probably want some way for server commands to gradually produce body
122
# data rather than passing it as a string; they could perhaps pass an
123
# iterator-like callback that will gradually yield data; it probably needs a
124
# close() method that will always be closed to do any necessary cleanup.
126
# TODO: Split the actual smart server from the ssh encoding of it.
128
# TODO: Perhaps support file-level readwrite operations over the transport
131
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
132
# branch doing file-level operations.
134
# TODO: jam 20060915 _decode_tuple is acting directly on input over
135
# the socket, and it assumes everything is UTF8 sections separated
136
# by \001. Which means a request like '\002' Will abort the connection
137
# because of a UnicodeDecodeError. It does look like invalid data will
138
# kill the SmartStreamServer, but only with an abort + exception, and
139
# the overall server shouldn't die.
141
from cStringIO import StringIO
159
from bzrlib.bundle.serializer import write_bundle
160
from bzrlib.trace import mutter
161
from bzrlib.transport import local
163
# must do this otherwise urllib can't parse the urls properly :(
164
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
165
transport.register_urlparse_netloc_protocol(scheme)
169
def _recv_tuple(from_file):
170
req_line = from_file.readline()
171
return _decode_tuple(req_line)
174
def _decode_tuple(req_line):
175
if req_line == None or req_line == '':
177
if req_line[-1] != '\n':
178
raise errors.SmartProtocolError("request %r not terminated" % req_line)
179
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
182
def _encode_tuple(args):
183
"""Encode the tuple args to a bytestream."""
184
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
187
class SmartProtocolBase(object):
188
"""Methods common to client and server"""
190
def _send_bulk_data(self, body):
191
"""Send chunked body data"""
192
assert isinstance(body, str)
193
bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
194
self._write_and_flush(bytes)
196
# TODO: this only actually accomodates a single block; possibly should support
198
def _recv_bulk(self):
199
chunk_len = self._in.readline()
201
chunk_len = int(chunk_len)
203
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
204
bulk = self._in.read(chunk_len)
205
if len(bulk) != chunk_len:
206
raise errors.SmartProtocolError("short read fetching bulk data chunk")
210
def _recv_tuple(self):
211
return _recv_tuple(self._in)
213
def _recv_trailer(self):
214
resp = self._recv_tuple()
215
if resp == ('done', ):
218
self._translate_error(resp)
220
def _serialise_offsets(self, offsets):
221
"""Serialise a readv offset list."""
223
for start, length in offsets:
224
txt.append('%d,%d' % (start, length))
225
return '\n'.join(txt)
227
def _write_and_flush(self, bytes):
228
"""Write bytes to self._out and flush it."""
229
# XXX: this will be inefficient. Just ask Robert.
230
self._out.write(bytes)
234
class SmartStreamServer(SmartProtocolBase):
235
"""Handles smart commands coming over a stream.
237
The stream may be a pipe connected to sshd, or a tcp socket, or an
238
in-process fifo for testing.
240
One instance is created for each connected client; it can serve multiple
241
requests in the lifetime of the connection.
243
The server passes requests through to an underlying backing transport,
244
which will typically be a LocalTransport looking at the server's filesystem.
247
def __init__(self, in_file, out_file, backing_transport):
248
"""Construct new server.
250
:param in_file: Python file from which requests can be read.
251
:param out_file: Python file to write responses.
252
:param backing_transport: Transport for the directory served.
256
self.smart_server = SmartServer(backing_transport)
257
# server can call back to us to get bulk data - this is not really
258
# ideal, they should get it per request instead
259
self.smart_server._recv_body = self._recv_bulk
261
def _recv_tuple(self):
262
"""Read a request from the client and return as a tuple.
264
Returns None at end of file (if the client closed the connection.)
266
return _recv_tuple(self._in)
268
def _send_tuple(self, args):
269
"""Send response header"""
270
return self._write_and_flush(_encode_tuple(args))
272
def _send_error_and_disconnect(self, exception):
273
self._send_tuple(('error', str(exception)))
277
def _serve_one_request(self):
278
"""Read one request from input, process, send back a response.
280
:return: False if the server should terminate, otherwise None.
282
req_args = self._recv_tuple()
283
mutter('server received %r' % (req_args,))
285
# client closed connection
286
return False # shutdown server
288
response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
289
mutter('server sending %r' % (response.args,))
290
self._send_tuple(response.args)
291
if response.body is not None:
292
self._send_bulk_data(response.body)
293
except KeyboardInterrupt:
296
# everything else: pass to client, flush, and quit
297
self._send_error_and_disconnect(e)
301
"""Serve requests until the client disconnects."""
302
# Keep a reference to stderr because the sys module's globals get set to
303
# None during interpreter shutdown.
304
from sys import stderr
306
while self._serve_one_request() != False:
309
stderr.write("%s terminating on exception %s\n" % (self, e))
313
class SmartServerResponse(object):
314
"""Response generated by SmartServer."""
316
def __init__(self, args, body=None):
320
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
321
# for delivering the data for a request. This could be done with as the
322
# StreamServer, though that would create conflation between request and response
323
# which may be undesirable.
326
class SmartServer(object):
327
"""Protocol logic for smart server.
329
This doesn't handle serialization at all, it just processes requests and
333
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
334
# encoding or decoding logic to allow the wire protocol to vary from the
335
# object protocol: we will want to tweak the wire protocol separate from
336
# the object model, and ideally we will be able to do that without having
337
# a SmartServer subclass for each wire protocol, rather just a Protocol
340
# TODO: Better way of representing the body for commands that take it,
341
# and allow it to be streamed into the server.
343
def __init__(self, backing_transport):
344
self._backing_transport = backing_transport
347
"""Answer a version request with my version."""
348
return SmartServerResponse(('ok', '1'))
350
def do_has(self, relpath):
351
r = self._backing_transport.has(relpath) and 'yes' or 'no'
352
return SmartServerResponse((r,))
354
def do_get(self, relpath):
355
backing_bytes = self._backing_transport.get_bytes(relpath)
356
return SmartServerResponse(('ok',), backing_bytes)
358
def _deserialise_optional_mode(self, mode):
359
# XXX: FIXME this should be on the protocol object.
365
def do_append(self, relpath, mode):
366
old_length = self._backing_transport.append_bytes(
367
relpath, self._recv_body(), self._deserialise_optional_mode(mode))
368
return SmartServerResponse(('appended', '%d' % old_length))
370
def do_delete(self, relpath):
371
self._backing_transport.delete(relpath)
373
def do_iter_files_recursive(self, abspath):
374
# XXX: the path handling needs some thought.
375
#relpath = self._backing_transport.relpath(abspath)
376
transport = self._backing_transport.clone(abspath)
377
filenames = transport.iter_files_recursive()
378
return SmartServerResponse(('names',) + tuple(filenames))
380
def do_list_dir(self, relpath):
381
filenames = self._backing_transport.list_dir(relpath)
382
return SmartServerResponse(('names',) + tuple(filenames))
384
def do_mkdir(self, relpath, mode):
385
self._backing_transport.mkdir(relpath,
386
self._deserialise_optional_mode(mode))
388
def do_move(self, rel_from, rel_to):
389
self._backing_transport.move(rel_from, rel_to)
391
def do_put(self, relpath, mode):
392
self._backing_transport.put_bytes(relpath,
394
self._deserialise_optional_mode(mode))
396
def _deserialise_offsets(self, text):
397
# XXX: FIXME this should be on the protocol object.
399
for line in text.split('\n'):
402
start, length = line.split(',')
403
offsets.append((int(start), int(length)))
406
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
407
create_parent_dir = (create_parent == 'T')
408
self._backing_transport.put_bytes_non_atomic(relpath,
410
mode=self._deserialise_optional_mode(mode),
411
create_parent_dir=create_parent_dir,
412
dir_mode=self._deserialise_optional_mode(dir_mode))
414
def do_readv(self, relpath):
415
offsets = self._deserialise_offsets(self._recv_body())
416
backing_bytes = ''.join(bytes for offset, bytes in
417
self._backing_transport.readv(relpath, offsets))
418
return SmartServerResponse(('readv',), backing_bytes)
420
def do_rename(self, rel_from, rel_to):
421
self._backing_transport.rename(rel_from, rel_to)
423
def do_rmdir(self, relpath):
424
self._backing_transport.rmdir(relpath)
426
def do_stat(self, relpath):
427
stat = self._backing_transport.stat(relpath)
428
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
430
def do_get_bundle(self, path, revision_id):
431
# open transport relative to our base
432
t = self._backing_transport.clone(path)
433
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
434
repo = control.open_repository()
435
tmpf = tempfile.TemporaryFile()
436
base_revision = revision.NULL_REVISION
437
write_bundle(repo, revision_id, base_revision, tmpf)
439
return SmartServerResponse((), tmpf.read())
441
def dispatch_command(self, cmd, args):
442
func = getattr(self, 'do_' + cmd, None)
444
raise errors.SmartProtocolError("bad request %r" % (cmd,))
448
result = SmartServerResponse(('ok',))
450
except errors.NoSuchFile, e:
451
return SmartServerResponse(('NoSuchFile', e.path))
452
except errors.FileExists, e:
453
return SmartServerResponse(('FileExists', e.path))
454
except errors.DirectoryNotEmpty, e:
455
return SmartServerResponse(('DirectoryNotEmpty', e.path))
456
except errors.ShortReadvError, e:
457
return SmartServerResponse(('ShortReadvError',
458
e.path, str(e.offset), str(e.length), str(e.actual)))
459
except UnicodeError, e:
460
# If it is a DecodeError, than most likely we are starting
461
# with a plain string
462
str_or_unicode = e.object
463
if isinstance(str_or_unicode, unicode):
464
val = u'u:' + str_or_unicode
466
val = u's:' + str_or_unicode.encode('base64')
467
# This handles UnicodeEncodeError or UnicodeDecodeError
468
return SmartServerResponse((e.__class__.__name__,
469
e.encoding, val, str(e.start), str(e.end), e.reason))
472
class SmartTCPServer(object):
473
"""Listens on a TCP socket and accepts connections from smart clients"""
475
def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
476
"""Construct a new server.
478
To actually start it running, call either start_background_thread or
481
:param host: Name of the interface to listen on.
482
:param port: TCP port to listen on, or 0 to allocate a transient port.
484
if backing_transport is None:
485
backing_transport = memory.MemoryTransport()
486
self._server_socket = socket.socket()
487
self._server_socket.bind((host, port))
488
self.port = self._server_socket.getsockname()[1]
489
self._server_socket.listen(1)
490
self._server_socket.settimeout(1)
491
self.backing_transport = backing_transport
494
# let connections timeout so that we get a chance to terminate
495
# Keep a reference to the exceptions we want to catch because the socket
496
# module's globals get set to None during interpreter shutdown.
497
from socket import timeout as socket_timeout
498
from socket import error as socket_error
499
self._should_terminate = False
500
while not self._should_terminate:
502
self.accept_and_serve()
503
except socket_timeout:
504
# just check if we're asked to stop
506
except socket_error, e:
507
trace.warning("client disconnected: %s", e)
511
"""Return the url of the server"""
512
return "bzr://%s:%d/" % self._server_socket.getsockname()
514
def accept_and_serve(self):
515
conn, client_addr = self._server_socket.accept()
516
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
517
from_client = conn.makefile('r')
518
to_client = conn.makefile('w')
519
handler = SmartStreamServer(from_client, to_client,
520
self.backing_transport)
521
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
522
connection_thread.setDaemon(True)
523
connection_thread.start()
525
def start_background_thread(self):
526
self._server_thread = threading.Thread(None,
528
name='server-' + self.get_url())
529
self._server_thread.setDaemon(True)
530
self._server_thread.start()
532
def stop_background_thread(self):
533
self._should_terminate = True
534
# self._server_socket.close()
535
# we used to join the thread, but it's not really necessary; it will
537
## self._server_thread.join()
540
class SmartTCPServer_for_testing(SmartTCPServer):
541
"""Server suitable for use by transport tests.
543
This server is backed by the process's cwd.
547
self._homedir = os.getcwd()
548
# The server is set up by default like for ssh access: the client
549
# passes filesystem-absolute paths; therefore the server must look
550
# them up relative to the root directory. it might be better to act
551
# a public server and have the server rewrite paths into the test
553
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
556
"""Set up server for testing"""
557
self.start_background_thread()
560
self.stop_background_thread()
563
"""Return the url of the server"""
564
host, port = self._server_socket.getsockname()
565
# XXX: I think this is likely to break on windows -- self._homedir will
566
# have backslashes (and maybe a drive letter?).
567
# -- Andrew Bennetts, 2006-08-29
568
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
570
def get_bogus_url(self):
571
"""Return a URL which will fail to connect"""
572
return 'bzr://127.0.0.1:1/'
575
class SmartStat(object):
577
def __init__(self, size, mode):
582
class SmartTransport(transport.Transport):
583
"""Connection to a smart server.
585
The connection holds references to pipes that can be used to send requests
588
The connection has a notion of the current directory to which it's
589
connected; this is incorporated in filenames passed to the server.
591
This supports some higher-level RPC operations and can also be treated
592
like a Transport to do file-like operations.
594
The connection can be made over a tcp socket, or (in future) an ssh pipe
595
or a series of http requests. There are concrete subclasses for each
596
type: SmartTCPTransport, etc.
599
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
600
# responsibilities: Put those on SmartClient or similar. This is vital for
601
# the ability to support multiple versions of the smart protocol over time:
602
# SmartTransport is an adapter from the Transport object model to the
603
# SmartClient model, not an encoder.
605
def __init__(self, url, clone_from=None, client=None):
608
:param client: ignored when clone_from is not None.
610
### Technically super() here is faulty because Transport's __init__
611
### fails to take 2 parameters, and if super were to choose a silly
612
### initialisation order things would blow up.
613
if not url.endswith('/'):
615
super(SmartTransport, self).__init__(url)
616
self._scheme, self._username, self._password, self._host, self._port, self._path = \
617
transport.split_url(url)
618
if clone_from is None:
620
self._client = SmartStreamClient(self._connect_to_server)
622
self._client = client
624
# credentials may be stripped from the base in some circumstances
625
# as yet to be clearly defined or documented, so copy them.
626
self._username = clone_from._username
627
# reuse same connection
628
self._client = clone_from._client
630
def abspath(self, relpath):
631
"""Return the full url to the given relative path.
633
@param relpath: the relative path or path components
634
@type relpath: str or list
636
return self._unparse_url(self._remote_path(relpath))
638
def clone(self, relative_url):
639
"""Make a new SmartTransport related to me, sharing the same connection.
641
This essentially opens a handle on a different remote directory.
643
if relative_url is None:
644
return self.__class__(self.base, self)
646
return self.__class__(self.abspath(relative_url), self)
648
def is_readonly(self):
649
"""Smart server transport can do read/write file operations."""
652
def get_smart_client(self):
655
def _unparse_url(self, path):
656
"""Return URL for a path.
658
:see: SFTPUrlHandling._unparse_url
660
# TODO: Eventually it should be possible to unify this with
661
# SFTPUrlHandling._unparse_url?
664
path = urllib.quote(path)
665
netloc = urllib.quote(self._host)
666
if self._username is not None:
667
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
668
if self._port is not None:
669
netloc = '%s:%d' % (netloc, self._port)
670
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
672
def _remote_path(self, relpath):
673
"""Returns the Unicode version of the absolute path for relpath."""
674
return self._combine_paths(self._path, relpath)
676
def has(self, relpath):
677
"""Indicate whether a remote file of the given name exists or not.
679
:see: Transport.has()
681
resp = self._client._call('has', self._remote_path(relpath))
682
if resp == ('yes', ):
684
elif resp == ('no', ):
687
self._translate_error(resp)
689
def get(self, relpath):
690
"""Return file-like object reading the contents of a remote file.
692
:see: Transport.get_bytes()/get_file()
694
remote = self._remote_path(relpath)
695
resp = self._client._call('get', remote)
697
self._translate_error(resp, relpath)
698
return StringIO(self._client._recv_bulk())
700
def _serialise_optional_mode(self, mode):
706
def mkdir(self, relpath, mode=None):
707
resp = self._client._call('mkdir',
708
self._remote_path(relpath),
709
self._serialise_optional_mode(mode))
710
self._translate_error(resp)
712
def put_bytes(self, relpath, upload_contents, mode=None):
713
# FIXME: upload_file is probably not safe for non-ascii characters -
714
# should probably just pass all parameters as length-delimited
716
resp = self._client._call_with_upload(
718
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
720
self._translate_error(resp)
722
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
723
create_parent_dir=False,
725
"""See Transport.put_bytes_non_atomic."""
726
# FIXME: no encoding in the transport!
727
create_parent_str = 'F'
728
if create_parent_dir:
729
create_parent_str = 'T'
731
resp = self._client._call_with_upload(
733
(self._remote_path(relpath), self._serialise_optional_mode(mode),
734
create_parent_str, self._serialise_optional_mode(dir_mode)),
736
self._translate_error(resp)
738
def put_file(self, relpath, upload_file, mode=None):
739
# its not ideal to seek back, but currently put_non_atomic_file depends
740
# on transports not reading before failing - which is a faulty
741
# assumption I think - RBC 20060915
742
pos = upload_file.tell()
744
return self.put_bytes(relpath, upload_file.read(), mode)
746
upload_file.seek(pos)
749
def put_file_non_atomic(self, relpath, f, mode=None,
750
create_parent_dir=False,
752
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
753
create_parent_dir=create_parent_dir,
756
def append_file(self, relpath, from_file, mode=None):
757
return self.append_bytes(relpath, from_file.read(), mode)
759
def append_bytes(self, relpath, bytes, mode=None):
760
resp = self._client._call_with_upload(
762
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
764
if resp[0] == 'appended':
766
self._translate_error(resp)
768
def delete(self, relpath):
769
resp = self._client._call('delete', self._remote_path(relpath))
770
self._translate_error(resp)
772
def readv(self, relpath, offsets):
776
offsets = list(offsets)
778
sorted_offsets = sorted(offsets)
779
# turn the list of offsets into a stack
780
offset_stack = iter(offsets)
781
cur_offset_and_size = offset_stack.next()
782
coalesced = list(self._coalesce_offsets(sorted_offsets,
783
limit=self._max_readv_combine,
784
fudge_factor=self._bytes_to_read_before_seek))
787
resp = self._client._call_with_upload(
789
(self._remote_path(relpath),),
790
self._client._serialise_offsets((c.start, c.length) for c in coalesced))
792
if resp[0] != 'readv':
793
# This should raise an exception
794
self._translate_error(resp)
797
data = self._client._recv_bulk()
798
# Cache the results, but only until they have been fulfilled
800
for c_offset in coalesced:
801
if len(data) < c_offset.length:
802
raise errors.ShortReadvError(relpath, c_offset.start,
803
c_offset.length, actual=len(data))
804
for suboffset, subsize in c_offset.ranges:
805
key = (c_offset.start+suboffset, subsize)
806
data_map[key] = data[suboffset:suboffset+subsize]
807
data = data[c_offset.length:]
809
# Now that we've read some data, see if we can yield anything back
810
while cur_offset_and_size in data_map:
811
this_data = data_map.pop(cur_offset_and_size)
812
yield cur_offset_and_size[0], this_data
813
cur_offset_and_size = offset_stack.next()
815
def rename(self, rel_from, rel_to):
817
self._remote_path(rel_from),
818
self._remote_path(rel_to))
820
def move(self, rel_from, rel_to):
822
self._remote_path(rel_from),
823
self._remote_path(rel_to))
825
def rmdir(self, relpath):
826
resp = self._call('rmdir', self._remote_path(relpath))
828
def _call(self, method, *args):
829
resp = self._client._call(method, *args)
830
self._translate_error(resp)
832
def _translate_error(self, resp, orig_path=None):
833
"""Raise an exception from a response"""
840
elif what == 'NoSuchFile':
841
if orig_path is not None:
842
error_path = orig_path
845
raise errors.NoSuchFile(error_path)
846
elif what == 'error':
847
raise errors.SmartProtocolError(unicode(resp[1]))
848
elif what == 'FileExists':
849
raise errors.FileExists(resp[1])
850
elif what == 'DirectoryNotEmpty':
851
raise errors.DirectoryNotEmpty(resp[1])
852
elif what == 'ShortReadvError':
853
raise errors.ShortReadvError(resp[1], int(resp[2]),
854
int(resp[3]), int(resp[4]))
855
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
856
encoding = str(resp[1]) # encoding must always be a string
860
reason = str(resp[5]) # reason must always be a string
861
if val.startswith('u:'):
863
elif val.startswith('s:'):
864
val = val[2:].decode('base64')
865
if what == 'UnicodeDecodeError':
866
raise UnicodeDecodeError(encoding, val, start, end, reason)
867
elif what == 'UnicodeEncodeError':
868
raise UnicodeEncodeError(encoding, val, start, end, reason)
870
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
872
def _send_tuple(self, args):
873
self._client._send_tuple(args)
875
def _recv_tuple(self):
876
return self._client._recv_tuple()
878
def disconnect(self):
879
self._client.disconnect()
881
def delete_tree(self, relpath):
882
raise errors.TransportNotPossible('readonly transport')
884
def stat(self, relpath):
885
resp = self._client._call('stat', self._remote_path(relpath))
886
if resp[0] == 'stat':
887
return SmartStat(int(resp[1]), int(resp[2], 8))
889
self._translate_error(resp)
891
## def lock_read(self, relpath):
892
## """Lock the given file for shared (read) access.
893
## :return: A lock object, which should be passed to Transport.unlock()
895
## # The old RemoteBranch ignore lock for reading, so we will
896
## # continue that tradition and return a bogus lock object.
897
## class BogusLock(object):
898
## def __init__(self, path):
902
## return BogusLock(relpath)
907
def list_dir(self, relpath):
908
resp = self._client._call('list_dir',
909
self._remote_path(relpath))
910
if resp[0] == 'names':
911
return [name.encode('ascii') for name in resp[1:]]
913
self._translate_error(resp)
915
def iter_files_recursive(self):
916
resp = self._client._call('iter_files_recursive',
917
self._remote_path(''))
918
if resp[0] == 'names':
921
self._translate_error(resp)
924
class SmartStreamClient(SmartProtocolBase):
925
"""Connection to smart server over two streams"""
927
def __init__(self, connect_func):
928
self._connect_func = connect_func
929
self._connected = False
934
def _ensure_connection(self):
935
if not self._connected:
936
self._in, self._out = self._connect_func()
937
self._connected = True
939
def _send_tuple(self, args):
940
self._ensure_connection()
941
return self._write_and_flush(_encode_tuple(args))
943
def _send_bulk_data(self, body):
944
self._ensure_connection()
945
SmartProtocolBase._send_bulk_data(self, body)
947
def _recv_bulk(self):
948
self._ensure_connection()
949
return SmartProtocolBase._recv_bulk(self)
951
def _recv_tuple(self):
952
self._ensure_connection()
953
return SmartProtocolBase._recv_tuple(self)
955
def _recv_trailer(self):
956
self._ensure_connection()
957
return SmartProtocolBase._recv_trailer(self)
959
def disconnect(self):
960
"""Close connection to the server"""
965
def _call(self, *args):
966
self._send_tuple(args)
967
return self._recv_tuple()
969
def _call_with_upload(self, method, args, body):
970
"""Call an rpc, supplying bulk upload data.
972
:param method: method name to call
973
:param args: parameter args tuple
974
:param body: upload body as a byte string
976
self._send_tuple((method,) + args)
977
self._send_bulk_data(body)
978
return self._recv_tuple()
980
def query_version(self):
981
"""Return protocol version number of the server."""
982
# XXX: should make sure it's empty
983
self._send_tuple(('hello',))
984
resp = self._recv_tuple()
985
if resp == ('ok', '1'):
988
raise errors.SmartProtocolError("bad response %r" % (resp,))
991
class SmartTCPTransport(SmartTransport):
992
"""Connection to smart server over plain tcp"""
994
def __init__(self, url, clone_from=None):
995
super(SmartTCPTransport, self).__init__(url, clone_from)
997
self._port = int(self._port)
998
except (ValueError, TypeError), e:
999
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1002
def _connect_to_server(self):
1003
self._socket = socket.socket()
1004
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1005
result = self._socket.connect_ex((self._host, int(self._port)))
1007
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1008
(self._host, self._port, os.strerror(result)))
1009
# TODO: May be more efficient to just treat them as sockets
1010
# throughout? But what about pipes to ssh?...
1011
to_server = self._socket.makefile('w')
1012
from_server = self._socket.makefile('r')
1013
return from_server, to_server
1015
def disconnect(self):
1016
super(SmartTCPTransport, self).disconnect()
1017
# XXX: Is closing the socket as well as closing the files really
1019
if self._socket is not None:
1020
self._socket.close()
1023
from bzrlib.transport import sftp, ssh
1024
except errors.ParamikoNotPresent:
1025
# no paramiko, no SSHTransport.
1028
class SmartSSHTransport(SmartTransport):
1029
"""Connection to smart server over SSH."""
1031
def __init__(self, url, clone_from=None):
1032
# TODO: all this probably belongs in the parent class.
1033
super(SmartSSHTransport, self).__init__(url, clone_from)
1035
if self._port is not None:
1036
self._port = int(self._port)
1037
except (ValueError, TypeError), e:
1038
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1040
def _connect_to_server(self):
1041
# XXX: don't hardcode vendor
1042
# XXX: cannot pass password to SSHSubprocess yet
1043
if self._password is not None:
1044
raise errors.InvalidURL("SSH smart transport doesn't handle passwords")
1045
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1046
vendor = ssh._get_ssh_vendor()
1047
self._ssh_connection = vendor.connect_ssh(self._username, None,
1048
self._host, self._port,
1049
command=[executable, 'serve', '--inet',
1051
return self._ssh_connection.get_filelike_channels()
1053
def disconnect(self):
1054
super(SmartSSHTransport, self).disconnect()
1055
self._ssh_connection.close()
1058
def get_test_permutations():
1059
"""Return (transport, server) permutations for testing"""
1060
return [(SmartTCPTransport, SmartTCPServer_for_testing)]