13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""RemoteTransport client for the smart-server.
19
This module shouldn't be accessed directly. The classes defined here should be
20
imported from bzrlib.smart.
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
23
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
53
# TODO: A plain integer from query_version is too simple; should give some
56
# TODO: Server should probably catch exceptions within itself and send them
57
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
58
# Also needs to somehow report protocol errors like bad requests. Need to
59
# consider how we'll handle error reporting, e.g. if we get halfway through a
60
# bulk transfer and then something goes wrong.
62
# TODO: Standard marker at start of request/response lines?
64
# TODO: Make each request and response self-validatable, e.g. with checksums.
66
# TODO: get/put objects could be changed to gradually read back the data as it
67
# comes across the network
69
# TODO: What should the server do if it hits an error and has to terminate?
71
# TODO: is it useful to allow multiple chunks in the bulk data?
73
# TODO: If we get an exception during transmission of bulk data we can't just
74
# emit the exception because it won't be seen.
75
# John proposes: I think it would be worthwhile to have a header on each
76
# chunk, that indicates it is another chunk. Then you can send an 'error'
77
# chunk as long as you finish the previous chunk.
79
# TODO: Clone method on Transport; should work up towards parent directory;
80
# unclear how this should be stored or communicated to the server... maybe
81
# just pass it on all relevant requests?
83
# TODO: Better name than clone() for changing between directories. How about
84
# open_dir or change_dir or chdir?
86
# TODO: Is it really good to have the notion of current directory within the
87
# connection? Perhaps all Transports should factor out a common connection
88
# from the thing that has the directory context?
90
# TODO: Pull more things common to sftp and ssh to a higher level.
92
# TODO: The server that manages a connection should be quite small and retain
93
# minimum state because each of the requests are supposed to be stateless.
94
# Then we can write another implementation that maps to http.
96
# TODO: What to do when a client connection is garbage collected? Maybe just
97
# abruptly drop the connection?
99
# TODO: Server in some cases will need to restrict access to files outside of
100
# a particular root directory. LocalTransport doesn't do anything to stop you
101
# ascending above the base directory, so we need to prevent paths
102
# containing '..' in either the server or transport layers. (Also need to
103
# consider what happens if someone creates a symlink pointing outside the
106
# TODO: Server should rebase absolute paths coming across the network to put
107
# them under the virtual root, if one is in use. LocalTransport currently
108
# doesn't do that; if you give it an absolute path it just uses it.
110
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
111
# urlescape them instead. Indeed possibly this should just literally be
114
# FIXME: This transport, with several others, has imperfect handling of paths
115
# within urls. It'd probably be better for ".." from a root to raise an error
116
# rather than return the same directory as we do at present.
118
# TODO: Rather than working at the Transport layer we want a Branch,
119
# Repository or BzrDir objects that talk to a server.
121
# TODO: Probably want some way for server commands to gradually produce body
122
# data rather than passing it as a string; they could perhaps pass an
123
# iterator-like callback that will gradually yield data; it probably needs a
124
# close() method that will always be closed to do any necessary cleanup.
126
# TODO: Split the actual smart server from the ssh encoding of it.
128
# TODO: Perhaps support file-level readwrite operations over the transport
131
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
132
# branch doing file-level operations.
134
# TODO: jam 20060915 _decode_tuple is acting directly on input over
135
# the socket, and it assumes everything is UTF8 sections separated
136
# by \001. Which means a request like '\002' Will abort the connection
137
# because of a UnicodeDecodeError. It does look like invalid data will
138
# kill the SmartStreamServer, but only with an abort + exception, and
139
# the overall server shouldn't die.
25
141
from cStringIO import StringIO
27
151
from bzrlib import (
36
from bzrlib.smart import client, medium
37
from bzrlib.symbol_versioning import (
42
class _SmartStat(object):
159
from bzrlib.bundle.serializer import write_bundle
160
from bzrlib.trace import mutter
161
from bzrlib.transport import local
163
# must do this otherwise urllib can't parse the urls properly :(
164
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
165
transport.register_urlparse_netloc_protocol(scheme)
169
def _recv_tuple(from_file):
170
req_line = from_file.readline()
171
return _decode_tuple(req_line)
174
def _decode_tuple(req_line):
175
if req_line == None or req_line == '':
177
if req_line[-1] != '\n':
178
raise errors.SmartProtocolError("request %r not terminated" % req_line)
179
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
182
def _encode_tuple(args):
183
"""Encode the tuple args to a bytestream."""
184
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
187
class SmartProtocolBase(object):
188
"""Methods common to client and server"""
190
def _send_bulk_data(self, body):
191
"""Send chunked body data"""
192
assert isinstance(body, str)
193
bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
194
self._write_and_flush(bytes)
196
# TODO: this only actually accomodates a single block; possibly should support
198
def _recv_bulk(self):
199
chunk_len = self._in.readline()
201
chunk_len = int(chunk_len)
203
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
204
bulk = self._in.read(chunk_len)
205
if len(bulk) != chunk_len:
206
raise errors.SmartProtocolError("short read fetching bulk data chunk")
210
def _recv_tuple(self):
211
return _recv_tuple(self._in)
213
def _recv_trailer(self):
214
resp = self._recv_tuple()
215
if resp == ('done', ):
218
self._translate_error(resp)
220
def _serialise_offsets(self, offsets):
221
"""Serialise a readv offset list."""
223
for start, length in offsets:
224
txt.append('%d,%d' % (start, length))
225
return '\n'.join(txt)
227
def _write_and_flush(self, bytes):
228
"""Write bytes to self._out and flush it."""
229
# XXX: this will be inefficient. Just ask Robert.
230
self._out.write(bytes)
234
class SmartStreamServer(SmartProtocolBase):
235
"""Handles smart commands coming over a stream.
237
The stream may be a pipe connected to sshd, or a tcp socket, or an
238
in-process fifo for testing.
240
One instance is created for each connected client; it can serve multiple
241
requests in the lifetime of the connection.
243
The server passes requests through to an underlying backing transport,
244
which will typically be a LocalTransport looking at the server's filesystem.
247
def __init__(self, in_file, out_file, backing_transport):
248
"""Construct new server.
250
:param in_file: Python file from which requests can be read.
251
:param out_file: Python file to write responses.
252
:param backing_transport: Transport for the directory served.
256
self.smart_server = SmartServer(backing_transport)
257
# server can call back to us to get bulk data - this is not really
258
# ideal, they should get it per request instead
259
self.smart_server._recv_body = self._recv_bulk
261
def _recv_tuple(self):
262
"""Read a request from the client and return as a tuple.
264
Returns None at end of file (if the client closed the connection.)
266
return _recv_tuple(self._in)
268
def _send_tuple(self, args):
269
"""Send response header"""
270
return self._write_and_flush(_encode_tuple(args))
272
def _send_error_and_disconnect(self, exception):
273
self._send_tuple(('error', str(exception)))
277
def _serve_one_request(self):
278
"""Read one request from input, process, send back a response.
280
:return: False if the server should terminate, otherwise None.
282
req_args = self._recv_tuple()
284
# client closed connection
285
return False # shutdown server
287
response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
288
self._send_tuple(response.args)
289
if response.body is not None:
290
self._send_bulk_data(response.body)
291
except KeyboardInterrupt:
294
# everything else: pass to client, flush, and quit
295
self._send_error_and_disconnect(e)
299
"""Serve requests until the client disconnects."""
300
# Keep a reference to stderr because the sys module's globals get set to
301
# None during interpreter shutdown.
302
from sys import stderr
304
while self._serve_one_request() != False:
307
stderr.write("%s terminating on exception %s\n" % (self, e))
311
class SmartServerResponse(object):
312
"""Response generated by SmartServer."""
314
def __init__(self, args, body=None):
318
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
319
# for delivering the data for a request. This could be done with as the
320
# StreamServer, though that would create conflation between request and response
321
# which may be undesirable.
324
class SmartServer(object):
325
"""Protocol logic for smart server.
327
This doesn't handle serialization at all, it just processes requests and
331
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
332
# encoding or decoding logic to allow the wire protocol to vary from the
333
# object protocol: we will want to tweak the wire protocol separate from
334
# the object model, and ideally we will be able to do that without having
335
# a SmartServer subclass for each wire protocol, rather just a Protocol
338
# TODO: Better way of representing the body for commands that take it,
339
# and allow it to be streamed into the server.
341
def __init__(self, backing_transport):
342
self._backing_transport = backing_transport
345
"""Answer a version request with my version."""
346
return SmartServerResponse(('ok', '1'))
348
def do_has(self, relpath):
349
r = self._backing_transport.has(relpath) and 'yes' or 'no'
350
return SmartServerResponse((r,))
352
def do_get(self, relpath):
353
backing_bytes = self._backing_transport.get_bytes(relpath)
354
return SmartServerResponse(('ok',), backing_bytes)
356
def _deserialise_optional_mode(self, mode):
357
# XXX: FIXME this should be on the protocol object.
363
def do_append(self, relpath, mode):
364
old_length = self._backing_transport.append_bytes(
365
relpath, self._recv_body(), self._deserialise_optional_mode(mode))
366
return SmartServerResponse(('appended', '%d' % old_length))
368
def do_delete(self, relpath):
369
self._backing_transport.delete(relpath)
371
def do_iter_files_recursive(self, abspath):
372
# XXX: the path handling needs some thought.
373
#relpath = self._backing_transport.relpath(abspath)
374
transport = self._backing_transport.clone(abspath)
375
filenames = transport.iter_files_recursive()
376
return SmartServerResponse(('names',) + tuple(filenames))
378
def do_list_dir(self, relpath):
379
filenames = self._backing_transport.list_dir(relpath)
380
return SmartServerResponse(('names',) + tuple(filenames))
382
def do_mkdir(self, relpath, mode):
383
self._backing_transport.mkdir(relpath,
384
self._deserialise_optional_mode(mode))
386
def do_move(self, rel_from, rel_to):
387
self._backing_transport.move(rel_from, rel_to)
389
def do_put(self, relpath, mode):
390
self._backing_transport.put_bytes(relpath,
392
self._deserialise_optional_mode(mode))
394
def _deserialise_offsets(self, text):
395
# XXX: FIXME this should be on the protocol object.
397
for line in text.split('\n'):
400
start, length = line.split(',')
401
offsets.append((int(start), int(length)))
404
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
405
create_parent_dir = (create_parent == 'T')
406
self._backing_transport.put_bytes_non_atomic(relpath,
408
mode=self._deserialise_optional_mode(mode),
409
create_parent_dir=create_parent_dir,
410
dir_mode=self._deserialise_optional_mode(dir_mode))
412
def do_readv(self, relpath):
413
offsets = self._deserialise_offsets(self._recv_body())
414
backing_bytes = ''.join(bytes for offset, bytes in
415
self._backing_transport.readv(relpath, offsets))
416
return SmartServerResponse(('readv',), backing_bytes)
418
def do_rename(self, rel_from, rel_to):
419
self._backing_transport.rename(rel_from, rel_to)
421
def do_rmdir(self, relpath):
422
self._backing_transport.rmdir(relpath)
424
def do_stat(self, relpath):
425
stat = self._backing_transport.stat(relpath)
426
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
428
def do_get_bundle(self, path, revision_id):
429
# open transport relative to our base
430
t = self._backing_transport.clone(path)
431
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
432
repo = control.open_repository()
433
tmpf = tempfile.TemporaryFile()
434
base_revision = revision.NULL_REVISION
435
write_bundle(repo, revision_id, base_revision, tmpf)
437
return SmartServerResponse((), tmpf.read())
439
def dispatch_command(self, cmd, args):
440
func = getattr(self, 'do_' + cmd, None)
442
raise errors.SmartProtocolError("bad request %r" % (cmd,))
446
result = SmartServerResponse(('ok',))
448
except errors.NoSuchFile, e:
449
return SmartServerResponse(('NoSuchFile', e.path))
450
except errors.FileExists, e:
451
return SmartServerResponse(('FileExists', e.path))
452
except errors.DirectoryNotEmpty, e:
453
return SmartServerResponse(('DirectoryNotEmpty', e.path))
454
except errors.ShortReadvError, e:
455
return SmartServerResponse(('ShortReadvError',
456
e.path, str(e.offset), str(e.length), str(e.actual)))
457
except UnicodeError, e:
458
# If it is a DecodeError, than most likely we are starting
459
# with a plain string
460
str_or_unicode = e.object
461
if isinstance(str_or_unicode, unicode):
462
val = u'u:' + str_or_unicode
464
val = u's:' + str_or_unicode.encode('base64')
465
# This handles UnicodeEncodeError or UnicodeDecodeError
466
return SmartServerResponse((e.__class__.__name__,
467
e.encoding, val, str(e.start), str(e.end), e.reason))
470
class SmartTCPServer(object):
471
"""Listens on a TCP socket and accepts connections from smart clients"""
473
def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
474
"""Construct a new server.
476
To actually start it running, call either start_background_thread or
479
:param host: Name of the interface to listen on.
480
:param port: TCP port to listen on, or 0 to allocate a transient port.
482
if backing_transport is None:
483
backing_transport = memory.MemoryTransport()
484
self._server_socket = socket.socket()
485
self._server_socket.bind((host, port))
486
self.port = self._server_socket.getsockname()[1]
487
self._server_socket.listen(1)
488
self._server_socket.settimeout(1)
489
self.backing_transport = backing_transport
492
# let connections timeout so that we get a chance to terminate
493
# Keep a reference to the exceptions we want to catch because the socket
494
# module's globals get set to None during interpreter shutdown.
495
from socket import timeout as socket_timeout
496
from socket import error as socket_error
497
self._should_terminate = False
498
while not self._should_terminate:
500
self.accept_and_serve()
501
except socket_timeout:
502
# just check if we're asked to stop
504
except socket_error, e:
505
trace.warning("client disconnected: %s", e)
509
"""Return the url of the server"""
510
return "bzr://%s:%d/" % self._server_socket.getsockname()
512
def accept_and_serve(self):
513
conn, client_addr = self._server_socket.accept()
514
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
515
from_client = conn.makefile('r')
516
to_client = conn.makefile('w')
517
handler = SmartStreamServer(from_client, to_client,
518
self.backing_transport)
519
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
520
connection_thread.setDaemon(True)
521
connection_thread.start()
523
def start_background_thread(self):
524
self._server_thread = threading.Thread(None,
526
name='server-' + self.get_url())
527
self._server_thread.setDaemon(True)
528
self._server_thread.start()
530
def stop_background_thread(self):
531
self._should_terminate = True
532
# self._server_socket.close()
533
# we used to join the thread, but it's not really necessary; it will
535
## self._server_thread.join()
538
class SmartTCPServer_for_testing(SmartTCPServer):
539
"""Server suitable for use by transport tests.
541
This server is backed by the process's cwd.
545
self._homedir = os.getcwd()
546
# The server is set up by default like for ssh access: the client
547
# passes filesystem-absolute paths; therefore the server must look
548
# them up relative to the root directory. it might be better to act
549
# a public server and have the server rewrite paths into the test
551
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
554
"""Set up server for testing"""
555
self.start_background_thread()
558
self.stop_background_thread()
561
"""Return the url of the server"""
562
host, port = self._server_socket.getsockname()
563
# XXX: I think this is likely to break on windows -- self._homedir will
564
# have backslashes (and maybe a drive letter?).
565
# -- Andrew Bennetts, 2006-08-29
566
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
568
def get_bogus_url(self):
569
"""Return a URL which will fail to connect"""
570
return 'bzr://127.0.0.1:1/'
573
class SmartStat(object):
44
575
def __init__(self, size, mode):
45
576
self.st_size = size
46
577
self.st_mode = mode
49
class RemoteTransport(transport.ConnectedTransport):
580
class SmartTransport(transport.Transport):
50
581
"""Connection to a smart server.
52
The connection holds references to the medium that can be used to send
53
requests to the server.
583
The connection holds references to pipes that can be used to send requests
55
586
The connection has a notion of the current directory to which it's
56
587
connected; this is incorporated in filenames passed to the server.
58
This supports some higher-level RPC operations and can also be treated
589
This supports some higher-level RPC operations and can also be treated
59
590
like a Transport to do file-like operations.
61
The connection can be made over a tcp socket, an ssh pipe or a series of
62
http requests. There are concrete subclasses for each type:
63
RemoteTCPTransport, etc.
592
The connection can be made over a tcp socket, or (in future) an ssh pipe
593
or a series of http requests. There are concrete subclasses for each
594
type: SmartTCPTransport, etc.
66
# When making a readv request, cap it at requesting 5MB of data
67
_max_readv_bytes = 5*1024*1024
69
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
597
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
70
598
# responsibilities: Put those on SmartClient or similar. This is vital for
71
599
# the ability to support multiple versions of the smart protocol over time:
72
# RemoteTransport is an adapter from the Transport object model to the
600
# SmartTransport is an adapter from the Transport object model to the
73
601
# SmartClient model, not an encoder.
75
# FIXME: the medium parameter should be private, only the tests requires
76
# it. It may be even clearer to define a TestRemoteTransport that handles
77
# the specific cases of providing a _client and/or a _medium, and leave
78
# RemoteTransport as an abstract class.
79
def __init__(self, url, _from_transport=None, medium=None, _client=None):
603
def __init__(self, url, clone_from=None, client=None):
82
:param _from_transport: Another RemoteTransport instance that this
83
one is being cloned from. Attributes such as the medium will
86
:param medium: The medium to use for this RemoteTransport. If None,
87
the medium from the _from_transport is shared. If both this
88
and _from_transport are None, a new medium will be built.
89
_from_transport and medium cannot both be specified.
91
:param _client: Override the _SmartClient used by this transport. This
92
should only be used for testing purposes; normally this is
93
determined from the medium.
95
super(RemoteTransport, self).__init__(
96
url, _from_transport=_from_transport)
98
# The medium is the connection, except when we need to share it with
99
# other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
# what we want to share is really the shared connection.
102
if (_from_transport is not None
103
and isinstance(_from_transport, RemoteTransport)):
104
_client = _from_transport._client
105
elif _from_transport is None:
106
# If no _from_transport is specified, we need to intialize the
110
medium, credentials = self._build_medium()
111
if 'hpss' in debug.debug_flags:
112
trace.mutter('hpss: Built a new medium: %s',
113
medium.__class__.__name__)
114
self._shared_connection = transport._SharedConnection(medium,
118
# No medium was specified, so share the medium from the
120
medium = self._shared_connection.connection
122
raise AssertionError(
123
"Both _from_transport (%r) and medium (%r) passed to "
124
"RemoteTransport.__init__, but these parameters are mutally "
125
"exclusive." % (_from_transport, medium))
128
self._client = client._SmartClient(medium)
130
self._client = _client
132
def _build_medium(self):
133
"""Create the medium if _from_transport does not provide one.
135
The medium is analogous to the connection for ConnectedTransport: it
136
allows connection sharing.
141
def _report_activity(self, bytes, direction):
142
"""See Transport._report_activity.
144
Does nothing; the smart medium will report activity triggered by a
606
:param client: ignored when clone_from is not None.
608
### Technically super() here is faulty because Transport's __init__
609
### fails to take 2 parameters, and if super were to choose a silly
610
### initialisation order things would blow up.
611
if not url.endswith('/'):
613
super(SmartTransport, self).__init__(url)
614
self._scheme, self._username, self._password, self._host, self._port, self._path = \
615
transport.split_url(url)
616
if clone_from is None:
618
self._client = SmartStreamClient(self._connect_to_server)
620
self._client = client
622
# credentials may be stripped from the base in some circumstances
623
# as yet to be clearly defined or documented, so copy them.
624
self._username = clone_from._username
625
# reuse same connection
626
self._client = clone_from._client
628
def abspath(self, relpath):
629
"""Return the full url to the given relative path.
631
@param relpath: the relative path or path components
632
@type relpath: str or list
634
return self._unparse_url(self._remote_path(relpath))
636
def clone(self, relative_url):
637
"""Make a new SmartTransport related to me, sharing the same connection.
639
This essentially opens a handle on a different remote directory.
641
if relative_url is None:
642
return self.__class__(self.base, self)
644
return self.__class__(self.abspath(relative_url), self)
149
646
def is_readonly(self):
150
647
"""Smart server transport can do read/write file operations."""
152
resp = self._call2('Transport.is_readonly')
153
except errors.UnknownSmartMethod:
154
# XXX: nasty hack: servers before 0.16 don't have a
155
# 'Transport.is_readonly' verb, so we do what clients before 0.16
158
if resp == ('yes', ):
160
elif resp == ('no', ):
163
raise errors.UnexpectedSmartServerResponse(resp)
165
650
def get_smart_client(self):
166
return self._get_connection()
653
def _unparse_url(self, path):
654
"""Return URL for a path.
168
def get_smart_medium(self):
169
return self._get_connection()
656
:see: SFTPUrlHandling._unparse_url
658
# TODO: Eventually it should be possible to unify this with
659
# SFTPUrlHandling._unparse_url?
662
path = urllib.quote(path)
663
netloc = urllib.quote(self._host)
664
if self._username is not None:
665
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
666
if self._port is not None:
667
netloc = '%s:%d' % (netloc, self._port)
668
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
171
670
def _remote_path(self, relpath):
172
671
"""Returns the Unicode version of the absolute path for relpath."""
173
672
return self._combine_paths(self._path, relpath)
175
def _call(self, method, *args):
176
resp = self._call2(method, *args)
177
self._ensure_ok(resp)
179
def _call2(self, method, *args):
180
"""Call a method on the remote server."""
182
return self._client.call(method, *args)
183
except errors.ErrorFromSmartServer, err:
184
# The first argument, if present, is always a path.
186
context = {'relpath': args[0]}
189
self._translate_error(err, **context)
191
def _call_with_body_bytes(self, method, args, body):
192
"""Call a method on the remote server with body bytes."""
194
return self._client.call_with_body_bytes(method, args, body)
195
except errors.ErrorFromSmartServer, err:
196
# The first argument, if present, is always a path.
198
context = {'relpath': args[0]}
201
self._translate_error(err, **context)
203
674
def has(self, relpath):
204
675
"""Indicate whether a remote file of the given name exists or not.
206
677
:see: Transport.has()
208
resp = self._call2('has', self._remote_path(relpath))
679
resp = self._client._call('has', self._remote_path(relpath))
209
680
if resp == ('yes', ):
211
682
elif resp == ('no', ):
214
raise errors.UnexpectedSmartServerResponse(resp)
685
self._translate_error(resp)
216
687
def get(self, relpath):
217
688
"""Return file-like object reading the contents of a remote file.
219
690
:see: Transport.get_bytes()/get_file()
221
return StringIO(self.get_bytes(relpath))
223
def get_bytes(self, relpath):
224
692
remote = self._remote_path(relpath)
226
resp, response_handler = self._client.call_expecting_body('get', remote)
227
except errors.ErrorFromSmartServer, err:
228
self._translate_error(err, relpath)
693
resp = self._client._call('get', remote)
229
694
if resp != ('ok', ):
230
response_handler.cancel_read_body()
231
raise errors.UnexpectedSmartServerResponse(resp)
232
return response_handler.read_body_bytes()
695
self._translate_error(resp, relpath)
696
return StringIO(self._client._recv_bulk())
234
698
def _serialise_optional_mode(self, mode):
301
754
def append_file(self, relpath, from_file, mode=None):
302
755
return self.append_bytes(relpath, from_file.read(), mode)
304
757
def append_bytes(self, relpath, bytes, mode=None):
305
resp = self._call_with_body_bytes(
758
resp = self._client._call_with_upload(
307
760
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
309
762
if resp[0] == 'appended':
310
763
return int(resp[1])
311
raise errors.UnexpectedSmartServerResponse(resp)
764
self._translate_error(resp)
313
766
def delete(self, relpath):
314
resp = self._call2('delete', self._remote_path(relpath))
315
self._ensure_ok(resp)
317
def external_url(self):
318
"""See bzrlib.transport.Transport.external_url."""
319
# the external path for RemoteTransports is the base
322
def recommended_page_size(self):
323
"""Return the recommended page size for this transport."""
326
def _readv(self, relpath, offsets):
767
resp = self._client._call('delete', self._remote_path(relpath))
768
self._translate_error(resp)
770
def readv(self, relpath, offsets):
330
774
offsets = list(offsets)
332
776
sorted_offsets = sorted(offsets)
777
# turn the list of offsets into a stack
778
offset_stack = iter(offsets)
779
cur_offset_and_size = offset_stack.next()
333
780
coalesced = list(self._coalesce_offsets(sorted_offsets,
334
781
limit=self._max_readv_combine,
335
fudge_factor=self._bytes_to_read_before_seek,
336
max_size=self._max_readv_bytes))
338
# now that we've coallesced things, avoid making enormous requests
343
if c.length + cur_len > self._max_readv_bytes:
344
requests.append(cur_request)
348
cur_request.append(c)
351
requests.append(cur_request)
352
if 'hpss' in debug.debug_flags:
353
trace.mutter('%s.readv %s offsets => %s coalesced'
354
' => %s requests (%s)',
355
self.__class__.__name__, len(offsets), len(coalesced),
356
len(requests), sum(map(len, requests)))
782
fudge_factor=self._bytes_to_read_before_seek))
785
resp = self._client._call_with_upload(
787
(self._remote_path(relpath),),
788
self._client._serialise_offsets((c.start, c.length) for c in coalesced))
790
if resp[0] != 'readv':
791
# This should raise an exception
792
self._translate_error(resp)
795
data = self._client._recv_bulk()
357
796
# Cache the results, but only until they have been fulfilled
359
# turn the list of offsets into a single stack to iterate
360
offset_stack = iter(offsets)
361
# using a list so it can be modified when passing down and coming back
362
next_offset = [offset_stack.next()]
363
for cur_request in requests:
365
result = self._client.call_with_body_readv_array(
366
('readv', self._remote_path(relpath),),
367
[(c.start, c.length) for c in cur_request])
368
resp, response_handler = result
369
except errors.ErrorFromSmartServer, err:
370
self._translate_error(err, relpath)
372
if resp[0] != 'readv':
373
# This should raise an exception
374
response_handler.cancel_read_body()
375
raise errors.UnexpectedSmartServerResponse(resp)
377
for res in self._handle_response(offset_stack, cur_request,
383
def _handle_response(self, offset_stack, coalesced, response_handler,
384
data_map, next_offset):
385
cur_offset_and_size = next_offset[0]
386
# FIXME: this should know how many bytes are needed, for clarity.
387
data = response_handler.read_body_bytes()
389
798
for c_offset in coalesced:
390
799
if len(data) < c_offset.length:
391
800
raise errors.ShortReadvError(relpath, c_offset.start,
392
801
c_offset.length, actual=len(data))
393
802
for suboffset, subsize in c_offset.ranges:
394
803
key = (c_offset.start+suboffset, subsize)
395
this_data = data[data_offset+suboffset:
396
data_offset+suboffset+subsize]
397
# Special case when the data is in-order, rather than packing
398
# into a map and then back out again. Benchmarking shows that
399
# this has 100% hit rate, but leave in the data_map work just
401
# TODO: Could we get away with using buffer() to avoid the
402
# memory copy? Callers would need to realize they may
403
# not have a real string.
404
if key == cur_offset_and_size:
405
yield cur_offset_and_size[0], this_data
406
cur_offset_and_size = next_offset[0] = offset_stack.next()
408
data_map[key] = this_data
409
data_offset += c_offset.length
804
data_map[key] = data[suboffset:suboffset+subsize]
805
data = data[c_offset.length:]
411
807
# Now that we've read some data, see if we can yield anything back
412
808
while cur_offset_and_size in data_map:
413
809
this_data = data_map.pop(cur_offset_and_size)
414
810
yield cur_offset_and_size[0], this_data
415
cur_offset_and_size = next_offset[0] = offset_stack.next()
811
cur_offset_and_size = offset_stack.next()
417
813
def rename(self, rel_from, rel_to):
419
815
self._remote_path(rel_from),
420
816
self._remote_path(rel_to))
422
818
def move(self, rel_from, rel_to):
424
820
self._remote_path(rel_from),
425
821
self._remote_path(rel_to))
427
823
def rmdir(self, relpath):
428
824
resp = self._call('rmdir', self._remote_path(relpath))
430
def _ensure_ok(self, resp):
432
raise errors.UnexpectedSmartServerResponse(resp)
434
def _translate_error(self, err, relpath=None):
435
remote._translate_error(err, path=relpath)
826
def _call(self, method, *args):
827
resp = self._client._call(method, *args)
828
self._translate_error(resp)
830
def _translate_error(self, resp, orig_path=None):
831
"""Raise an exception from a response"""
835
elif what == 'NoSuchFile':
836
if orig_path is not None:
837
error_path = orig_path
840
raise errors.NoSuchFile(error_path)
841
elif what == 'error':
842
raise errors.SmartProtocolError(unicode(resp[1]))
843
elif what == 'FileExists':
844
raise errors.FileExists(resp[1])
845
elif what == 'DirectoryNotEmpty':
846
raise errors.DirectoryNotEmpty(resp[1])
847
elif what == 'ShortReadvError':
848
raise errors.ShortReadvError(resp[1], int(resp[2]),
849
int(resp[3]), int(resp[4]))
850
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
851
encoding = str(resp[1]) # encoding must always be a string
855
reason = str(resp[5]) # reason must always be a string
856
if val.startswith('u:'):
858
elif val.startswith('s:'):
859
val = val[2:].decode('base64')
860
if what == 'UnicodeDecodeError':
861
raise UnicodeDecodeError(encoding, val, start, end, reason)
862
elif what == 'UnicodeEncodeError':
863
raise UnicodeEncodeError(encoding, val, start, end, reason)
865
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
867
def _send_tuple(self, args):
868
self._client._send_tuple(args)
870
def _recv_tuple(self):
871
return self._client._recv_tuple()
437
873
def disconnect(self):
438
self.get_smart_medium().disconnect()
874
self._client.disconnect()
876
def delete_tree(self, relpath):
877
raise errors.TransportNotPossible('readonly transport')
440
879
def stat(self, relpath):
441
resp = self._call2('stat', self._remote_path(relpath))
880
resp = self._client._call('stat', self._remote_path(relpath))
442
881
if resp[0] == 'stat':
443
return _SmartStat(int(resp[1]), int(resp[2], 8))
444
raise errors.UnexpectedSmartServerResponse(resp)
882
return SmartStat(int(resp[1]), int(resp[2], 8))
884
self._translate_error(resp)
446
886
## def lock_read(self, relpath):
447
887
## """Lock the given file for shared (read) access.
462
902
def list_dir(self, relpath):
463
resp = self._call2('list_dir', self._remote_path(relpath))
903
resp = self._client._call('list_dir',
904
self._remote_path(relpath))
464
905
if resp[0] == 'names':
465
906
return [name.encode('ascii') for name in resp[1:]]
466
raise errors.UnexpectedSmartServerResponse(resp)
908
self._translate_error(resp)
468
910
def iter_files_recursive(self):
469
resp = self._call2('iter_files_recursive', self._remote_path(''))
911
resp = self._client._call('iter_files_recursive',
912
self._remote_path(''))
470
913
if resp[0] == 'names':
472
raise errors.UnexpectedSmartServerResponse(resp)
475
class RemoteTCPTransport(RemoteTransport):
476
"""Connection to smart server over plain tcp.
478
This is essentially just a factory to get 'RemoteTransport(url,
479
SmartTCPClientMedium).
482
def _build_medium(self):
483
client_medium = medium.SmartTCPClientMedium(
484
self._host, self._port, self.base)
485
return client_medium, None
488
class RemoteTCPTransportV2Only(RemoteTransport):
489
"""Connection to smart server over plain tcp with the client hard-coded to
490
assume protocol v2 and remote server version <= 1.6.
492
This should only be used for testing.
495
def _build_medium(self):
496
client_medium = medium.SmartTCPClientMedium(
497
self._host, self._port, self.base)
498
client_medium._protocol_version = 2
499
client_medium._remember_remote_is_before((1, 6))
500
return client_medium, None
503
class RemoteSSHTransport(RemoteTransport):
504
"""Connection to smart server over SSH.
506
This is essentially just a factory to get 'RemoteTransport(url,
507
SmartSSHClientMedium).
510
def _build_medium(self):
511
location_config = config.LocationConfig(self.base)
512
bzr_remote_path = location_config.get_bzr_remote_path()
515
auth = config.AuthenticationConfig()
516
user = auth.get_user('ssh', self._host, self._port)
517
client_medium = medium.SmartSSHClientMedium(self._host, self._port,
518
user, self._password, self.base,
519
bzr_remote_path=bzr_remote_path)
520
return client_medium, (user, self._password)
523
class RemoteHTTPTransport(RemoteTransport):
524
"""Just a way to connect between a bzr+http:// url and http://.
526
This connection operates slightly differently than the RemoteSSHTransport.
527
It uses a plain http:// transport underneath, which defines what remote
528
.bzr/smart URL we are connected to. From there, all paths that are sent are
529
sent as relative paths, this way, the remote side can properly
530
de-reference them, since it is likely doing rewrite rules to translate an
531
HTTP path into a local path.
534
def __init__(self, base, _from_transport=None, http_transport=None):
535
if http_transport is None:
536
# FIXME: the password may be lost here because it appears in the
537
# url only for an intial construction (when the url came from the
539
http_url = base[len('bzr+'):]
540
self._http_transport = transport.get_transport(http_url)
542
self._http_transport = http_transport
543
super(RemoteHTTPTransport, self).__init__(
544
base, _from_transport=_from_transport)
546
def _build_medium(self):
547
# We let http_transport take care of the credentials
548
return self._http_transport.get_smart_medium(), None
550
def _remote_path(self, relpath):
551
"""After connecting, HTTP Transport only deals in relative URLs."""
552
# Adjust the relpath based on which URL this smart transport is
554
http_base = urlutils.normalize_url(self.get_smart_medium().base)
555
url = urlutils.join(self.base[len('bzr+'):], relpath)
556
url = urlutils.normalize_url(url)
557
return urlutils.relative_url(http_base, url)
559
def clone(self, relative_url):
560
"""Make a new RemoteHTTPTransport related to me.
562
This is re-implemented rather than using the default
563
RemoteTransport.clone() because we must be careful about the underlying
566
Also, the cloned smart transport will POST to the same .bzr/smart
567
location as this transport (although obviously the relative paths in the
568
smart requests may be different). This is so that the server doesn't
569
have to handle .bzr/smart requests at arbitrary places inside .bzr
570
directories, just at the initial URL the user uses.
916
self._translate_error(resp)
919
class SmartStreamClient(SmartProtocolBase):
920
"""Connection to smart server over two streams"""
922
def __init__(self, connect_func):
923
self._connect_func = connect_func
924
self._connected = False
929
def _ensure_connection(self):
930
if not self._connected:
931
self._in, self._out = self._connect_func()
932
self._connected = True
934
def _send_tuple(self, args):
935
self._ensure_connection()
936
return self._write_and_flush(_encode_tuple(args))
938
def _send_bulk_data(self, body):
939
self._ensure_connection()
940
SmartProtocolBase._send_bulk_data(self, body)
942
def _recv_bulk(self):
943
self._ensure_connection()
944
return SmartProtocolBase._recv_bulk(self)
946
def _recv_tuple(self):
947
self._ensure_connection()
948
return SmartProtocolBase._recv_tuple(self)
950
def _recv_trailer(self):
951
self._ensure_connection()
952
return SmartProtocolBase._recv_trailer(self)
954
def disconnect(self):
955
"""Close connection to the server"""
960
def _call(self, *args):
961
self._send_tuple(args)
962
return self._recv_tuple()
964
def _call_with_upload(self, method, args, body):
965
"""Call an rpc, supplying bulk upload data.
967
:param method: method name to call
968
:param args: parameter args tuple
969
:param body: upload body as a byte string
573
abs_url = self.abspath(relative_url)
576
return RemoteHTTPTransport(abs_url,
577
_from_transport=self,
578
http_transport=self._http_transport)
580
def _redirected_to(self, source, target):
581
"""See transport._redirected_to"""
582
redirected = self._http_transport._redirected_to(source, target)
583
if (redirected is not None
584
and isinstance(redirected, type(self._http_transport))):
585
return RemoteHTTPTransport('bzr+' + redirected.external_url(),
586
http_transport=redirected)
588
# Either None or a transport for a different protocol
592
class HintingSSHTransport(transport.Transport):
593
"""Simple transport that handles ssh:// and points out bzr+ssh://."""
595
def __init__(self, url):
596
raise errors.UnsupportedProtocol(url,
597
'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
971
self._send_tuple((method,) + args)
972
self._send_bulk_data(body)
973
return self._recv_tuple()
975
def query_version(self):
976
"""Return protocol version number of the server."""
977
# XXX: should make sure it's empty
978
self._send_tuple(('hello',))
979
resp = self._recv_tuple()
980
if resp == ('ok', '1'):
983
raise errors.SmartProtocolError("bad response %r" % (resp,))
986
class SmartTCPTransport(SmartTransport):
987
"""Connection to smart server over plain tcp"""
989
def __init__(self, url, clone_from=None):
990
super(SmartTCPTransport, self).__init__(url, clone_from)
992
self._port = int(self._port)
993
except (ValueError, TypeError), e:
994
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
997
def _connect_to_server(self):
998
self._socket = socket.socket()
999
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1000
result = self._socket.connect_ex((self._host, int(self._port)))
1002
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1003
(self._host, self._port, os.strerror(result)))
1004
# TODO: May be more efficient to just treat them as sockets
1005
# throughout? But what about pipes to ssh?...
1006
to_server = self._socket.makefile('w')
1007
from_server = self._socket.makefile('r')
1008
return from_server, to_server
1010
def disconnect(self):
1011
super(SmartTCPTransport, self).disconnect()
1012
# XXX: Is closing the socket as well as closing the files really
1014
if self._socket is not None:
1015
self._socket.close()
1018
from bzrlib.transport import sftp
1019
except errors.ParamikoNotPresent:
1020
# no paramiko, no SSHTransport.
1023
class SmartSSHTransport(SmartTransport):
1024
"""Connection to smart server over SSH."""
1026
def __init__(self, url, clone_from=None):
1027
# TODO: all this probably belongs in the parent class.
1028
super(SmartSSHTransport, self).__init__(url, clone_from)
1030
if self._port is not None:
1031
self._port = int(self._port)
1032
except (ValueError, TypeError), e:
1033
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1035
def _connect_to_server(self):
1036
# XXX: don't hardcode vendor
1037
# XXX: cannot pass password to SSHSubprocess yet
1038
if self._password is not None:
1039
raise errors.InvalidURL("SSH smart transport doesn't handle passwords")
1040
self._ssh_connection = sftp.SSHSubprocess(self._host, 'openssh',
1041
port=self._port, user=self._username,
1042
command=['bzr', 'serve', '--inet'])
1043
return self._ssh_connection.get_filelike_channels()
1045
def disconnect(self):
1046
super(SmartSSHTransport, self).disconnect()
1047
self._ssh_connection.close()
600
1050
def get_test_permutations():
601
"""Return (transport, server) permutations for testing."""
602
### We may need a little more test framework support to construct an
603
### appropriate RemoteTransport in the future.
604
from bzrlib.tests import test_server
605
return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]
1051
"""Return (transport, server) permutations for testing"""
1052
return [(SmartTCPTransport, SmartTCPServer_for_testing)]