1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""RemoteTransport client for the smart-server.
19
This module shouldn't be accessed directly. The classes defined here should be
20
imported from bzrlib.smart.
23
__all__ = ['RemoteTransport', 'SmartTCPTransport', 'SmartSSHTransport']
25
from cStringIO import StringIO
42
from bzrlib.bundle.serializer import write_bundle
43
from bzrlib.smart import client, medium, protocol
45
# must do this otherwise urllib can't parse the urls properly :(
46
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
47
transport.register_urlparse_netloc_protocol(scheme)
51
# Port 4155 is the default port for bzr://, registered with IANA.
52
BZR_DEFAULT_PORT = 4155
55
def _recv_tuple(from_file):
56
req_line = from_file.readline()
57
return _decode_tuple(req_line)
60
def _decode_tuple(req_line):
61
if req_line == None or req_line == '':
63
if req_line[-1] != '\n':
64
raise errors.SmartProtocolError("request %r not terminated" % req_line)
65
return tuple(req_line[:-1].split('\x01'))
68
def _encode_tuple(args):
69
"""Encode the tuple args to a bytestream."""
70
return '\x01'.join(args) + '\n'
73
class SmartProtocolBase(object):
74
"""Methods common to client and server"""
76
# TODO: this only actually accomodates a single block; possibly should
77
# support multiple chunks?
78
def _encode_bulk_data(self, body):
79
"""Encode body as a bulk data chunk."""
80
return ''.join(('%d\n' % len(body), body, 'done\n'))
82
def _serialise_offsets(self, offsets):
83
"""Serialise a readv offset list."""
85
for start, length in offsets:
86
txt.append('%d,%d' % (start, length))
90
class SmartServerRequestProtocolOne(SmartProtocolBase):
91
"""Server-side encoding and decoding logic for smart version 1."""
93
def __init__(self, backing_transport, write_func):
94
self._backing_transport = backing_transport
95
self.excess_buffer = ''
96
self._finished = False
98
self.has_dispatched = False
100
self._body_decoder = None
101
self._write_func = write_func
103
def accept_bytes(self, bytes):
104
"""Take bytes, and advance the internal state machine appropriately.
106
:param bytes: must be a byte string
108
assert isinstance(bytes, str)
109
self.in_buffer += bytes
110
if not self.has_dispatched:
111
if '\n' not in self.in_buffer:
112
# no command line yet
114
self.has_dispatched = True
116
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
118
req_args = _decode_tuple(first_line)
119
self.request = SmartServerRequestHandler(
120
self._backing_transport)
121
self.request.dispatch_command(req_args[0], req_args[1:])
122
if self.request.finished_reading:
124
self.excess_buffer = self.in_buffer
126
self._send_response(self.request.response.args,
127
self.request.response.body)
128
except KeyboardInterrupt:
130
except Exception, exception:
131
# everything else: pass to client, flush, and quit
132
self._send_response(('error', str(exception)))
135
if self.has_dispatched:
137
# nothing to do.XXX: this routine should be a single state
139
self.excess_buffer += self.in_buffer
142
if self._body_decoder is None:
143
self._body_decoder = LengthPrefixedBodyDecoder()
144
self._body_decoder.accept_bytes(self.in_buffer)
145
self.in_buffer = self._body_decoder.unused_data
146
body_data = self._body_decoder.read_pending_data()
147
self.request.accept_body(body_data)
148
if self._body_decoder.finished_reading:
149
self.request.end_of_body()
150
assert self.request.finished_reading, \
151
"no more body, request not finished"
152
if self.request.response is not None:
153
self._send_response(self.request.response.args,
154
self.request.response.body)
155
self.excess_buffer = self.in_buffer
158
assert not self.request.finished_reading, \
159
"no response and we have finished reading."
161
def _send_response(self, args, body=None):
162
"""Send a smart server response down the output stream."""
163
assert not self._finished, 'response already sent'
164
self._finished = True
165
self._write_func(_encode_tuple(args))
167
assert isinstance(body, str), 'body must be a str'
168
bytes = self._encode_bulk_data(body)
169
self._write_func(bytes)
171
def next_read_size(self):
174
if self._body_decoder is None:
177
return self._body_decoder.next_read_size()
180
class LengthPrefixedBodyDecoder(object):
181
"""Decodes the length-prefixed bulk data."""
184
self.bytes_left = None
185
self.finished_reading = False
186
self.unused_data = ''
187
self.state_accept = self._state_accept_expecting_length
188
self.state_read = self._state_read_no_data
190
self._trailer_buffer = ''
192
def accept_bytes(self, bytes):
193
"""Decode as much of bytes as possible.
195
If 'bytes' contains too much data it will be appended to
198
finished_reading will be set when no more data is required. Further
199
data will be appended to self.unused_data.
201
# accept_bytes is allowed to change the state
202
current_state = self.state_accept
203
self.state_accept(bytes)
204
while current_state != self.state_accept:
205
current_state = self.state_accept
206
self.state_accept('')
208
def next_read_size(self):
209
if self.bytes_left is not None:
210
# Ideally we want to read all the remainder of the body and the
212
return self.bytes_left + 5
213
elif self.state_accept == self._state_accept_reading_trailer:
214
# Just the trailer left
215
return 5 - len(self._trailer_buffer)
216
elif self.state_accept == self._state_accept_expecting_length:
217
# There's still at least 6 bytes left ('\n' to end the length, plus
221
# Reading excess data. Either way, 1 byte at a time is fine.
224
def read_pending_data(self):
225
"""Return any pending data that has been decoded."""
226
return self.state_read()
228
def _state_accept_expecting_length(self, bytes):
229
self._in_buffer += bytes
230
pos = self._in_buffer.find('\n')
233
self.bytes_left = int(self._in_buffer[:pos])
234
self._in_buffer = self._in_buffer[pos+1:]
235
self.bytes_left -= len(self._in_buffer)
236
self.state_accept = self._state_accept_reading_body
237
self.state_read = self._state_read_in_buffer
239
def _state_accept_reading_body(self, bytes):
240
self._in_buffer += bytes
241
self.bytes_left -= len(bytes)
242
if self.bytes_left <= 0:
244
if self.bytes_left != 0:
245
self._trailer_buffer = self._in_buffer[self.bytes_left:]
246
self._in_buffer = self._in_buffer[:self.bytes_left]
247
self.bytes_left = None
248
self.state_accept = self._state_accept_reading_trailer
250
def _state_accept_reading_trailer(self, bytes):
251
self._trailer_buffer += bytes
252
# TODO: what if the trailer does not match "done\n"? Should this raise
253
# a ProtocolViolation exception?
254
if self._trailer_buffer.startswith('done\n'):
255
self.unused_data = self._trailer_buffer[len('done\n'):]
256
self.state_accept = self._state_accept_reading_unused
257
self.finished_reading = True
259
def _state_accept_reading_unused(self, bytes):
260
self.unused_data += bytes
262
def _state_read_no_data(self):
265
def _state_read_in_buffer(self):
266
result = self._in_buffer
271
class SmartServerStreamMedium(object):
272
"""Handles smart commands coming over a stream.
274
The stream may be a pipe connected to sshd, or a tcp socket, or an
275
in-process fifo for testing.
277
One instance is created for each connected client; it can serve multiple
278
requests in the lifetime of the connection.
280
The server passes requests through to an underlying backing transport,
281
which will typically be a LocalTransport looking at the server's filesystem.
284
def __init__(self, backing_transport):
285
"""Construct new server.
287
:param backing_transport: Transport for the directory served.
289
# backing_transport could be passed to serve instead of __init__
290
self.backing_transport = backing_transport
291
self.finished = False
294
"""Serve requests until the client disconnects."""
295
# Keep a reference to stderr because the sys module's globals get set to
296
# None during interpreter shutdown.
297
from sys import stderr
299
while not self.finished:
300
protocol = SmartServerRequestProtocolOne(self.backing_transport,
302
self._serve_one_request(protocol)
304
stderr.write("%s terminating on exception %s\n" % (self, e))
307
def _serve_one_request(self, protocol):
308
"""Read one request from input, process, send back a response.
310
:param protocol: a SmartServerRequestProtocol.
313
self._serve_one_request_unguarded(protocol)
314
except KeyboardInterrupt:
317
self.terminate_due_to_error()
319
def terminate_due_to_error(self):
320
"""Called when an unhandled exception from the protocol occurs."""
321
raise NotImplementedError(self.terminate_due_to_error)
324
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
326
def __init__(self, sock, backing_transport):
329
:param sock: the socket the server will read from. It will be put
332
SmartServerStreamMedium.__init__(self, backing_transport)
334
sock.setblocking(True)
337
def _serve_one_request_unguarded(self, protocol):
338
while protocol.next_read_size():
340
protocol.accept_bytes(self.push_back)
343
bytes = self.socket.recv(4096)
347
protocol.accept_bytes(bytes)
349
self.push_back = protocol.excess_buffer
351
def terminate_due_to_error(self):
352
"""Called when an unhandled exception from the protocol occurs."""
353
# TODO: This should log to a server log file, but no such thing
354
# exists yet. Andrew Bennetts 2006-09-29.
358
def _write_out(self, bytes):
359
self.socket.sendall(bytes)
362
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
364
def __init__(self, in_file, out_file, backing_transport):
365
"""Construct new server.
367
:param in_file: Python file from which requests can be read.
368
:param out_file: Python file to write responses.
369
:param backing_transport: Transport for the directory served.
371
SmartServerStreamMedium.__init__(self, backing_transport)
372
if sys.platform == 'win32':
373
# force binary mode for files
375
for f in (in_file, out_file):
376
fileno = getattr(f, 'fileno', None)
378
msvcrt.setmode(fileno(), os.O_BINARY)
382
def _serve_one_request_unguarded(self, protocol):
384
bytes_to_read = protocol.next_read_size()
385
if bytes_to_read == 0:
386
# Finished serving this request.
389
bytes = self._in.read(bytes_to_read)
391
# Connection has been closed.
395
protocol.accept_bytes(bytes)
397
def terminate_due_to_error(self):
398
# TODO: This should log to a server log file, but no such thing
399
# exists yet. Andrew Bennetts 2006-09-29.
403
def _write_out(self, bytes):
404
self._out.write(bytes)
407
class SmartServerResponse(object):
408
"""Response generated by SmartServerRequestHandler."""
410
def __init__(self, args, body=None):
414
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
415
# for delivering the data for a request. This could be done with as the
416
# StreamServer, though that would create conflation between request and response
417
# which may be undesirable.
420
class SmartServerRequestHandler(object):
421
"""Protocol logic for smart server.
423
This doesn't handle serialization at all, it just processes requests and
427
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
428
# not contain encoding or decoding logic to allow the wire protocol to vary
429
# from the object protocol: we will want to tweak the wire protocol separate
430
# from the object model, and ideally we will be able to do that without
431
# having a SmartServerRequestHandler subclass for each wire protocol, rather
432
# just a Protocol subclass.
434
# TODO: Better way of representing the body for commands that take it,
435
# and allow it to be streamed into the server.
437
def __init__(self, backing_transport):
438
self._backing_transport = backing_transport
439
self._converted_command = False
440
self.finished_reading = False
441
self._body_bytes = ''
444
def accept_body(self, bytes):
447
This should be overriden for each command that desired body data to
448
handle the right format of that data. I.e. plain bytes, a bundle etc.
450
The deserialisation into that format should be done in the Protocol
451
object. Set self.desired_body_format to the format your method will
454
# default fallback is to accumulate bytes.
455
self._body_bytes += bytes
457
def _end_of_body_handler(self):
458
"""An unimplemented end of body handler."""
459
raise NotImplementedError(self._end_of_body_handler)
462
"""Answer a version request with my version."""
463
return SmartServerResponse(('ok', '1'))
465
def do_has(self, relpath):
466
r = self._backing_transport.has(relpath) and 'yes' or 'no'
467
return SmartServerResponse((r,))
469
def do_get(self, relpath):
470
backing_bytes = self._backing_transport.get_bytes(relpath)
471
return SmartServerResponse(('ok',), backing_bytes)
473
def _deserialise_optional_mode(self, mode):
474
# XXX: FIXME this should be on the protocol object.
480
def do_append(self, relpath, mode):
481
self._converted_command = True
482
self._relpath = relpath
483
self._mode = self._deserialise_optional_mode(mode)
484
self._end_of_body_handler = self._handle_do_append_end
486
def _handle_do_append_end(self):
487
old_length = self._backing_transport.append_bytes(
488
self._relpath, self._body_bytes, self._mode)
489
self.response = SmartServerResponse(('appended', '%d' % old_length))
491
def do_delete(self, relpath):
492
self._backing_transport.delete(relpath)
494
def do_iter_files_recursive(self, relpath):
495
transport = self._backing_transport.clone(relpath)
496
filenames = transport.iter_files_recursive()
497
return SmartServerResponse(('names',) + tuple(filenames))
499
def do_list_dir(self, relpath):
500
filenames = self._backing_transport.list_dir(relpath)
501
return SmartServerResponse(('names',) + tuple(filenames))
503
def do_mkdir(self, relpath, mode):
504
self._backing_transport.mkdir(relpath,
505
self._deserialise_optional_mode(mode))
507
def do_move(self, rel_from, rel_to):
508
self._backing_transport.move(rel_from, rel_to)
510
def do_put(self, relpath, mode):
511
self._converted_command = True
512
self._relpath = relpath
513
self._mode = self._deserialise_optional_mode(mode)
514
self._end_of_body_handler = self._handle_do_put
516
def _handle_do_put(self):
517
self._backing_transport.put_bytes(self._relpath,
518
self._body_bytes, self._mode)
519
self.response = SmartServerResponse(('ok',))
521
def _deserialise_offsets(self, text):
522
# XXX: FIXME this should be on the protocol object.
524
for line in text.split('\n'):
527
start, length = line.split(',')
528
offsets.append((int(start), int(length)))
531
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
532
self._converted_command = True
533
self._end_of_body_handler = self._handle_put_non_atomic
534
self._relpath = relpath
535
self._dir_mode = self._deserialise_optional_mode(dir_mode)
536
self._mode = self._deserialise_optional_mode(mode)
537
# a boolean would be nicer XXX
538
self._create_parent = (create_parent == 'T')
540
def _handle_put_non_atomic(self):
541
self._backing_transport.put_bytes_non_atomic(self._relpath,
544
create_parent_dir=self._create_parent,
545
dir_mode=self._dir_mode)
546
self.response = SmartServerResponse(('ok',))
548
def do_readv(self, relpath):
549
self._converted_command = True
550
self._end_of_body_handler = self._handle_readv_offsets
551
self._relpath = relpath
553
def end_of_body(self):
554
"""No more body data will be received."""
555
self._run_handler_code(self._end_of_body_handler, (), {})
556
# cannot read after this.
557
self.finished_reading = True
559
def _handle_readv_offsets(self):
560
"""accept offsets for a readv request."""
561
offsets = self._deserialise_offsets(self._body_bytes)
562
backing_bytes = ''.join(bytes for offset, bytes in
563
self._backing_transport.readv(self._relpath, offsets))
564
self.response = SmartServerResponse(('readv',), backing_bytes)
566
def do_rename(self, rel_from, rel_to):
567
self._backing_transport.rename(rel_from, rel_to)
569
def do_rmdir(self, relpath):
570
self._backing_transport.rmdir(relpath)
572
def do_stat(self, relpath):
573
stat = self._backing_transport.stat(relpath)
574
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
576
def do_get_bundle(self, path, revision_id):
577
# open transport relative to our base
578
t = self._backing_transport.clone(path)
579
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
580
repo = control.open_repository()
581
tmpf = tempfile.TemporaryFile()
582
base_revision = revision.NULL_REVISION
583
write_bundle(repo, revision_id, base_revision, tmpf)
585
return SmartServerResponse((), tmpf.read())
587
def dispatch_command(self, cmd, args):
588
"""Deprecated compatibility method.""" # XXX XXX
589
func = getattr(self, 'do_' + cmd, None)
591
raise errors.SmartProtocolError("bad request %r" % (cmd,))
592
self._run_handler_code(func, args, {})
594
def _run_handler_code(self, callable, args, kwargs):
595
"""Run some handler specific code 'callable'.
597
If a result is returned, it is considered to be the commands response,
598
and finished_reading is set true, and its assigned to self.response.
600
Any exceptions caught are translated and a response object created
603
result = self._call_converting_errors(callable, args, kwargs)
604
if result is not None:
605
self.response = result
606
self.finished_reading = True
607
# handle unconverted commands
608
if not self._converted_command:
609
self.finished_reading = True
611
self.response = SmartServerResponse(('ok',))
613
def _call_converting_errors(self, callable, args, kwargs):
614
"""Call callable converting errors to Response objects."""
616
return callable(*args, **kwargs)
617
except errors.NoSuchFile, e:
618
return SmartServerResponse(('NoSuchFile', e.path))
619
except errors.FileExists, e:
620
return SmartServerResponse(('FileExists', e.path))
621
except errors.DirectoryNotEmpty, e:
622
return SmartServerResponse(('DirectoryNotEmpty', e.path))
623
except errors.ShortReadvError, e:
624
return SmartServerResponse(('ShortReadvError',
625
e.path, str(e.offset), str(e.length), str(e.actual)))
626
except UnicodeError, e:
627
# If it is a DecodeError, than most likely we are starting
628
# with a plain string
629
str_or_unicode = e.object
630
if isinstance(str_or_unicode, unicode):
631
# XXX: UTF-8 might have \x01 (our seperator byte) in it. We
632
# should escape it somehow.
633
val = 'u:' + str_or_unicode.encode('utf-8')
635
val = 's:' + str_or_unicode.encode('base64')
636
# This handles UnicodeEncodeError or UnicodeDecodeError
637
return SmartServerResponse((e.__class__.__name__,
638
e.encoding, val, str(e.start), str(e.end), e.reason))
639
except errors.TransportNotPossible, e:
640
if e.msg == "readonly transport":
641
return SmartServerResponse(('ReadOnlyError', ))
646
class SmartTCPServer(object):
647
"""Listens on a TCP socket and accepts connections from smart clients"""
649
def __init__(self, backing_transport, host='127.0.0.1', port=0):
650
"""Construct a new server.
652
To actually start it running, call either start_background_thread or
655
:param host: Name of the interface to listen on.
656
:param port: TCP port to listen on, or 0 to allocate a transient port.
658
self._server_socket = socket.socket()
659
self._server_socket.bind((host, port))
660
self.port = self._server_socket.getsockname()[1]
661
self._server_socket.listen(1)
662
self._server_socket.settimeout(1)
663
self.backing_transport = backing_transport
666
# let connections timeout so that we get a chance to terminate
667
# Keep a reference to the exceptions we want to catch because the socket
668
# module's globals get set to None during interpreter shutdown.
669
from socket import timeout as socket_timeout
670
from socket import error as socket_error
671
self._should_terminate = False
672
while not self._should_terminate:
674
self.accept_and_serve()
675
except socket_timeout:
676
# just check if we're asked to stop
678
except socket_error, e:
679
trace.warning("client disconnected: %s", e)
683
"""Return the url of the server"""
684
return "bzr://%s:%d/" % self._server_socket.getsockname()
686
def accept_and_serve(self):
687
conn, client_addr = self._server_socket.accept()
688
# For WIN32, where the timeout value from the listening socket
689
# propogates to the newly accepted socket.
690
conn.setblocking(True)
691
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
692
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
693
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
694
connection_thread.setDaemon(True)
695
connection_thread.start()
697
def start_background_thread(self):
698
self._server_thread = threading.Thread(None,
700
name='server-' + self.get_url())
701
self._server_thread.setDaemon(True)
702
self._server_thread.start()
704
def stop_background_thread(self):
705
self._should_terminate = True
706
# At one point we would wait to join the threads here, but it looks
707
# like they don't actually exit. So now we just leave them running
708
# and expect to terminate the process. -- mbp 20070215
709
# self._server_socket.close()
710
## sys.stderr.write("waiting for server thread to finish...")
711
## self._server_thread.join()
714
class SmartTCPServer_for_testing(SmartTCPServer):
715
"""Server suitable for use by transport tests.
717
This server is backed by the process's cwd.
721
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
722
# The server is set up by default like for ssh access: the client
723
# passes filesystem-absolute paths; therefore the server must look
724
# them up relative to the root directory. it might be better to act
725
# a public server and have the server rewrite paths into the test
727
SmartTCPServer.__init__(self,
728
transport.get_transport(urlutils.local_path_to_url('/')))
731
"""Set up server for testing"""
732
self.start_background_thread()
735
self.stop_background_thread()
738
"""Return the url of the server"""
739
host, port = self._server_socket.getsockname()
740
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
742
def get_bogus_url(self):
743
"""Return a URL which will fail to connect"""
744
return 'bzr://127.0.0.1:1/'
747
class _SmartStat(object):
749
def __init__(self, size, mode):
754
class RemoteTransport(transport.Transport):
755
"""Connection to a smart server.
757
The connection holds references to the medium that can be used to send
758
requests to the server.
760
The connection has a notion of the current directory to which it's
761
connected; this is incorporated in filenames passed to the server.
763
This supports some higher-level RPC operations and can also be treated
764
like a Transport to do file-like operations.
766
The connection can be made over a tcp socket, an ssh pipe or a series of
767
http requests. There are concrete subclasses for each type:
768
SmartTCPTransport, etc.
771
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
772
# responsibilities: Put those on SmartClient or similar. This is vital for
773
# the ability to support multiple versions of the smart protocol over time:
774
# RemoteTransport is an adapter from the Transport object model to the
775
# SmartClient model, not an encoder.
777
def __init__(self, url, clone_from=None, medium=None):
780
:param medium: The medium to use for this RemoteTransport. This must be
781
supplied if clone_from is None.
783
### Technically super() here is faulty because Transport's __init__
784
### fails to take 2 parameters, and if super were to choose a silly
785
### initialisation order things would blow up.
786
if not url.endswith('/'):
788
super(RemoteTransport, self).__init__(url)
789
self._scheme, self._username, self._password, self._host, self._port, self._path = \
790
transport.split_url(url)
791
if clone_from is None:
792
self._medium = medium
794
# credentials may be stripped from the base in some circumstances
795
# as yet to be clearly defined or documented, so copy them.
796
self._username = clone_from._username
797
# reuse same connection
798
self._medium = clone_from._medium
799
assert self._medium is not None
801
def abspath(self, relpath):
802
"""Return the full url to the given relative path.
804
@param relpath: the relative path or path components
805
@type relpath: str or list
807
return self._unparse_url(self._remote_path(relpath))
809
def clone(self, relative_url):
810
"""Make a new RemoteTransport related to me, sharing the same connection.
812
This essentially opens a handle on a different remote directory.
814
if relative_url is None:
815
return RemoteTransport(self.base, self)
817
return RemoteTransport(self.abspath(relative_url), self)
819
def is_readonly(self):
820
"""Smart server transport can do read/write file operations."""
821
resp = self._call2('Transport.is_readonly')
822
if resp == ('yes', ):
824
elif resp == ('no', ):
827
self._translate_error(resp)
828
assert False, 'weird response %r' % (resp,)
830
def get_smart_client(self):
833
def get_smart_medium(self):
836
def _unparse_url(self, path):
837
"""Return URL for a path.
839
:see: SFTPUrlHandling._unparse_url
841
# TODO: Eventually it should be possible to unify this with
842
# SFTPUrlHandling._unparse_url?
845
path = urllib.quote(path)
846
netloc = urllib.quote(self._host)
847
if self._username is not None:
848
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
849
if self._port is not None:
850
netloc = '%s:%d' % (netloc, self._port)
851
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
853
def _remote_path(self, relpath):
854
"""Returns the Unicode version of the absolute path for relpath."""
855
return self._combine_paths(self._path, relpath)
857
def _call(self, method, *args):
858
resp = self._call2(method, *args)
859
self._translate_error(resp)
861
def _call2(self, method, *args):
862
"""Call a method on the remote server."""
863
return client.SmartClient(self._medium).call(method, *args)
865
def _call_with_body_bytes(self, method, args, body):
866
"""Call a method on the remote server with body bytes."""
867
smart_client = client.SmartClient(self._medium)
868
return smart_client.call_with_body_bytes(method, args, body)
870
def has(self, relpath):
871
"""Indicate whether a remote file of the given name exists or not.
873
:see: Transport.has()
875
resp = self._call2('has', self._remote_path(relpath))
876
if resp == ('yes', ):
878
elif resp == ('no', ):
881
self._translate_error(resp)
883
def get(self, relpath):
884
"""Return file-like object reading the contents of a remote file.
886
:see: Transport.get_bytes()/get_file()
888
return StringIO(self.get_bytes(relpath))
890
def get_bytes(self, relpath):
891
remote = self._remote_path(relpath)
892
request = self._medium.get_request()
893
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
894
smart_protocol.call('get', remote)
895
resp = smart_protocol.read_response_tuple(True)
897
smart_protocol.cancel_read_body()
898
self._translate_error(resp, relpath)
899
return smart_protocol.read_body_bytes()
901
def _serialise_optional_mode(self, mode):
907
def mkdir(self, relpath, mode=None):
908
resp = self._call2('mkdir', self._remote_path(relpath),
909
self._serialise_optional_mode(mode))
910
self._translate_error(resp)
912
def put_bytes(self, relpath, upload_contents, mode=None):
913
# FIXME: upload_file is probably not safe for non-ascii characters -
914
# should probably just pass all parameters as length-delimited
916
resp = self._call_with_body_bytes('put',
917
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
919
self._translate_error(resp)
921
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
922
create_parent_dir=False,
924
"""See Transport.put_bytes_non_atomic."""
925
# FIXME: no encoding in the transport!
926
create_parent_str = 'F'
927
if create_parent_dir:
928
create_parent_str = 'T'
930
resp = self._call_with_body_bytes(
932
(self._remote_path(relpath), self._serialise_optional_mode(mode),
933
create_parent_str, self._serialise_optional_mode(dir_mode)),
935
self._translate_error(resp)
937
def put_file(self, relpath, upload_file, mode=None):
938
# its not ideal to seek back, but currently put_non_atomic_file depends
939
# on transports not reading before failing - which is a faulty
940
# assumption I think - RBC 20060915
941
pos = upload_file.tell()
943
return self.put_bytes(relpath, upload_file.read(), mode)
945
upload_file.seek(pos)
948
def put_file_non_atomic(self, relpath, f, mode=None,
949
create_parent_dir=False,
951
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
952
create_parent_dir=create_parent_dir,
955
def append_file(self, relpath, from_file, mode=None):
956
return self.append_bytes(relpath, from_file.read(), mode)
958
def append_bytes(self, relpath, bytes, mode=None):
959
resp = self._call_with_body_bytes(
961
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
963
if resp[0] == 'appended':
965
self._translate_error(resp)
967
def delete(self, relpath):
968
resp = self._call2('delete', self._remote_path(relpath))
969
self._translate_error(resp)
971
def readv(self, relpath, offsets):
975
offsets = list(offsets)
977
sorted_offsets = sorted(offsets)
978
# turn the list of offsets into a stack
979
offset_stack = iter(offsets)
980
cur_offset_and_size = offset_stack.next()
981
coalesced = list(self._coalesce_offsets(sorted_offsets,
982
limit=self._max_readv_combine,
983
fudge_factor=self._bytes_to_read_before_seek))
985
request = self._medium.get_request()
986
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
987
smart_protocol.call_with_body_readv_array(
988
('readv', self._remote_path(relpath)),
989
[(c.start, c.length) for c in coalesced])
990
resp = smart_protocol.read_response_tuple(True)
992
if resp[0] != 'readv':
993
# This should raise an exception
994
smart_protocol.cancel_read_body()
995
self._translate_error(resp)
998
# FIXME: this should know how many bytes are needed, for clarity.
999
data = smart_protocol.read_body_bytes()
1000
# Cache the results, but only until they have been fulfilled
1002
for c_offset in coalesced:
1003
if len(data) < c_offset.length:
1004
raise errors.ShortReadvError(relpath, c_offset.start,
1005
c_offset.length, actual=len(data))
1006
for suboffset, subsize in c_offset.ranges:
1007
key = (c_offset.start+suboffset, subsize)
1008
data_map[key] = data[suboffset:suboffset+subsize]
1009
data = data[c_offset.length:]
1011
# Now that we've read some data, see if we can yield anything back
1012
while cur_offset_and_size in data_map:
1013
this_data = data_map.pop(cur_offset_and_size)
1014
yield cur_offset_and_size[0], this_data
1015
cur_offset_and_size = offset_stack.next()
1017
def rename(self, rel_from, rel_to):
1018
self._call('rename',
1019
self._remote_path(rel_from),
1020
self._remote_path(rel_to))
1022
def move(self, rel_from, rel_to):
1024
self._remote_path(rel_from),
1025
self._remote_path(rel_to))
1027
def rmdir(self, relpath):
1028
resp = self._call('rmdir', self._remote_path(relpath))
1030
def _translate_error(self, resp, orig_path=None):
1031
"""Raise an exception from a response"""
1038
elif what == 'NoSuchFile':
1039
if orig_path is not None:
1040
error_path = orig_path
1042
error_path = resp[1]
1043
raise errors.NoSuchFile(error_path)
1044
elif what == 'error':
1045
raise errors.SmartProtocolError(unicode(resp[1]))
1046
elif what == 'FileExists':
1047
raise errors.FileExists(resp[1])
1048
elif what == 'DirectoryNotEmpty':
1049
raise errors.DirectoryNotEmpty(resp[1])
1050
elif what == 'ShortReadvError':
1051
raise errors.ShortReadvError(resp[1], int(resp[2]),
1052
int(resp[3]), int(resp[4]))
1053
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1054
encoding = str(resp[1]) # encoding must always be a string
1056
start = int(resp[3])
1058
reason = str(resp[5]) # reason must always be a string
1059
if val.startswith('u:'):
1060
val = val[2:].decode('utf-8')
1061
elif val.startswith('s:'):
1062
val = val[2:].decode('base64')
1063
if what == 'UnicodeDecodeError':
1064
raise UnicodeDecodeError(encoding, val, start, end, reason)
1065
elif what == 'UnicodeEncodeError':
1066
raise UnicodeEncodeError(encoding, val, start, end, reason)
1067
elif what == "ReadOnlyError":
1068
raise errors.TransportNotPossible('readonly transport')
1070
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1072
def disconnect(self):
1073
self._medium.disconnect()
1075
def delete_tree(self, relpath):
1076
raise errors.TransportNotPossible('readonly transport')
1078
def stat(self, relpath):
1079
resp = self._call2('stat', self._remote_path(relpath))
1080
if resp[0] == 'stat':
1081
return _SmartStat(int(resp[1]), int(resp[2], 8))
1083
self._translate_error(resp)
1085
## def lock_read(self, relpath):
1086
## """Lock the given file for shared (read) access.
1087
## :return: A lock object, which should be passed to Transport.unlock()
1089
## # The old RemoteBranch ignore lock for reading, so we will
1090
## # continue that tradition and return a bogus lock object.
1091
## class BogusLock(object):
1092
## def __init__(self, path):
1094
## def unlock(self):
1096
## return BogusLock(relpath)
1101
def list_dir(self, relpath):
1102
resp = self._call2('list_dir', self._remote_path(relpath))
1103
if resp[0] == 'names':
1104
return [name.encode('ascii') for name in resp[1:]]
1106
self._translate_error(resp)
1108
def iter_files_recursive(self):
1109
resp = self._call2('iter_files_recursive', self._remote_path(''))
1110
if resp[0] == 'names':
1113
self._translate_error(resp)
1116
class SmartTCPTransport(RemoteTransport):
1117
"""Connection to smart server over plain tcp.
1119
This is essentially just a factory to get 'RemoteTransport(url,
1120
SmartTCPClientMedium).
1123
def __init__(self, url):
1124
_scheme, _username, _password, _host, _port, _path = \
1125
transport.split_url(url)
1127
_port = BZR_DEFAULT_PORT
1131
except (ValueError, TypeError), e:
1132
raise errors.InvalidURL(
1133
path=url, extra="invalid port %s" % _port)
1134
client_medium = medium.SmartTCPClientMedium(_host, _port)
1135
super(SmartTCPTransport, self).__init__(url, medium=client_medium)
1138
class SmartSSHTransport(RemoteTransport):
1139
"""Connection to smart server over SSH.
1141
This is essentially just a factory to get 'RemoteTransport(url,
1142
SmartSSHClientMedium).
1145
def __init__(self, url):
1146
_scheme, _username, _password, _host, _port, _path = \
1147
transport.split_url(url)
1149
if _port is not None:
1151
except (ValueError, TypeError), e:
1152
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1154
client_medium = medium.SmartSSHClientMedium(_host, _port,
1155
_username, _password)
1156
super(SmartSSHTransport, self).__init__(url, medium=client_medium)
1159
class SmartHTTPTransport(RemoteTransport):
1160
"""Just a way to connect between a bzr+http:// url and http://.
1162
This connection operates slightly differently than the SmartSSHTransport.
1163
It uses a plain http:// transport underneath, which defines what remote
1164
.bzr/smart URL we are connected to. From there, all paths that are sent are
1165
sent as relative paths, this way, the remote side can properly
1166
de-reference them, since it is likely doing rewrite rules to translate an
1167
HTTP path into a local path.
1170
def __init__(self, url, http_transport=None):
1171
assert url.startswith('bzr+http://')
1173
if http_transport is None:
1174
http_url = url[len('bzr+'):]
1175
self._http_transport = transport.get_transport(http_url)
1177
self._http_transport = http_transport
1178
http_medium = self._http_transport.get_smart_medium()
1179
super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
1181
def _remote_path(self, relpath):
1182
"""After connecting HTTP Transport only deals in relative URLs."""
1188
def abspath(self, relpath):
1189
"""Return the full url to the given relative path.
1191
:param relpath: the relative path or path components
1192
:type relpath: str or list
1194
return self._unparse_url(self._combine_paths(self._path, relpath))
1196
def clone(self, relative_url):
1197
"""Make a new SmartHTTPTransport related to me.
1199
This is re-implemented rather than using the default
1200
SmartTransport.clone() because we must be careful about the underlying
1204
abs_url = self.abspath(relative_url)
1207
# By cloning the underlying http_transport, we are able to share the
1209
new_transport = self._http_transport.clone(relative_url)
1210
return SmartHTTPTransport(abs_url, http_transport=new_transport)
1213
def get_test_permutations():
1214
"""Return (transport, server) permutations for testing."""
1215
### We may need a little more test framework support to construct an
1216
### appropriate RemoteTransport in the future.
1217
from bzrlib.smart import server
1218
return [(SmartTCPTransport, server.SmartTCPServer_for_testing)]