1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""RemoteTransport client for the smart-server.
19
This module shouldn't be accessed directly. The classes defined here should be
20
imported from bzrlib.smart.
23
__all__ = ['RemoteTransport', 'SmartTCPTransport', 'SmartSSHTransport']
25
from cStringIO import StringIO
42
from bzrlib.bundle.serializer import write_bundle
43
from bzrlib.smart import client, medium, protocol
45
# must do this otherwise urllib can't parse the urls properly :(
46
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
47
transport.register_urlparse_netloc_protocol(scheme)
51
# Port 4155 is the default port for bzr://, registered with IANA.
52
BZR_DEFAULT_PORT = 4155
55
def _recv_tuple(from_file):
56
req_line = from_file.readline()
57
return _decode_tuple(req_line)
60
def _decode_tuple(req_line):
61
if req_line == None or req_line == '':
63
if req_line[-1] != '\n':
64
raise errors.SmartProtocolError("request %r not terminated" % req_line)
65
return tuple(req_line[:-1].split('\x01'))
68
def _encode_tuple(args):
69
"""Encode the tuple args to a bytestream."""
70
return '\x01'.join(args) + '\n'
73
class SmartProtocolBase(object):
74
"""Methods common to client and server"""
76
# TODO: this only actually accomodates a single block; possibly should
77
# support multiple chunks?
78
def _encode_bulk_data(self, body):
79
"""Encode body as a bulk data chunk."""
80
return ''.join(('%d\n' % len(body), body, 'done\n'))
82
def _serialise_offsets(self, offsets):
83
"""Serialise a readv offset list."""
85
for start, length in offsets:
86
txt.append('%d,%d' % (start, length))
90
class SmartServerRequestProtocolOne(SmartProtocolBase):
91
"""Server-side encoding and decoding logic for smart version 1."""
93
def __init__(self, backing_transport, write_func):
94
self._backing_transport = backing_transport
95
self.excess_buffer = ''
96
self._finished = False
98
self.has_dispatched = False
100
self._body_decoder = None
101
self._write_func = write_func
103
def accept_bytes(self, bytes):
104
"""Take bytes, and advance the internal state machine appropriately.
106
:param bytes: must be a byte string
108
assert isinstance(bytes, str)
109
self.in_buffer += bytes
110
if not self.has_dispatched:
111
if '\n' not in self.in_buffer:
112
# no command line yet
114
self.has_dispatched = True
116
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
118
req_args = _decode_tuple(first_line)
119
self.request = SmartServerRequestHandler(
120
self._backing_transport)
121
self.request.dispatch_command(req_args[0], req_args[1:])
122
if self.request.finished_reading:
124
self.excess_buffer = self.in_buffer
126
self._send_response(self.request.response.args,
127
self.request.response.body)
128
except KeyboardInterrupt:
130
except Exception, exception:
131
# everything else: pass to client, flush, and quit
132
self._send_response(('error', str(exception)))
135
if self.has_dispatched:
137
# nothing to do.XXX: this routine should be a single state
139
self.excess_buffer += self.in_buffer
142
if self._body_decoder is None:
143
self._body_decoder = LengthPrefixedBodyDecoder()
144
self._body_decoder.accept_bytes(self.in_buffer)
145
self.in_buffer = self._body_decoder.unused_data
146
body_data = self._body_decoder.read_pending_data()
147
self.request.accept_body(body_data)
148
if self._body_decoder.finished_reading:
149
self.request.end_of_body()
150
assert self.request.finished_reading, \
151
"no more body, request not finished"
152
if self.request.response is not None:
153
self._send_response(self.request.response.args,
154
self.request.response.body)
155
self.excess_buffer = self.in_buffer
158
assert not self.request.finished_reading, \
159
"no response and we have finished reading."
161
def _send_response(self, args, body=None):
162
"""Send a smart server response down the output stream."""
163
assert not self._finished, 'response already sent'
164
self._finished = True
165
self._write_func(_encode_tuple(args))
167
assert isinstance(body, str), 'body must be a str'
168
bytes = self._encode_bulk_data(body)
169
self._write_func(bytes)
171
def next_read_size(self):
174
if self._body_decoder is None:
177
return self._body_decoder.next_read_size()
180
class LengthPrefixedBodyDecoder(object):
181
"""Decodes the length-prefixed bulk data."""
184
self.bytes_left = None
185
self.finished_reading = False
186
self.unused_data = ''
187
self.state_accept = self._state_accept_expecting_length
188
self.state_read = self._state_read_no_data
190
self._trailer_buffer = ''
192
def accept_bytes(self, bytes):
193
"""Decode as much of bytes as possible.
195
If 'bytes' contains too much data it will be appended to
198
finished_reading will be set when no more data is required. Further
199
data will be appended to self.unused_data.
201
# accept_bytes is allowed to change the state
202
current_state = self.state_accept
203
self.state_accept(bytes)
204
while current_state != self.state_accept:
205
current_state = self.state_accept
206
self.state_accept('')
208
def next_read_size(self):
209
if self.bytes_left is not None:
210
# Ideally we want to read all the remainder of the body and the
212
return self.bytes_left + 5
213
elif self.state_accept == self._state_accept_reading_trailer:
214
# Just the trailer left
215
return 5 - len(self._trailer_buffer)
216
elif self.state_accept == self._state_accept_expecting_length:
217
# There's still at least 6 bytes left ('\n' to end the length, plus
221
# Reading excess data. Either way, 1 byte at a time is fine.
224
def read_pending_data(self):
225
"""Return any pending data that has been decoded."""
226
return self.state_read()
228
def _state_accept_expecting_length(self, bytes):
229
self._in_buffer += bytes
230
pos = self._in_buffer.find('\n')
233
self.bytes_left = int(self._in_buffer[:pos])
234
self._in_buffer = self._in_buffer[pos+1:]
235
self.bytes_left -= len(self._in_buffer)
236
self.state_accept = self._state_accept_reading_body
237
self.state_read = self._state_read_in_buffer
239
def _state_accept_reading_body(self, bytes):
240
self._in_buffer += bytes
241
self.bytes_left -= len(bytes)
242
if self.bytes_left <= 0:
244
if self.bytes_left != 0:
245
self._trailer_buffer = self._in_buffer[self.bytes_left:]
246
self._in_buffer = self._in_buffer[:self.bytes_left]
247
self.bytes_left = None
248
self.state_accept = self._state_accept_reading_trailer
250
def _state_accept_reading_trailer(self, bytes):
251
self._trailer_buffer += bytes
252
# TODO: what if the trailer does not match "done\n"? Should this raise
253
# a ProtocolViolation exception?
254
if self._trailer_buffer.startswith('done\n'):
255
self.unused_data = self._trailer_buffer[len('done\n'):]
256
self.state_accept = self._state_accept_reading_unused
257
self.finished_reading = True
259
def _state_accept_reading_unused(self, bytes):
260
self.unused_data += bytes
262
def _state_read_no_data(self):
265
def _state_read_in_buffer(self):
266
result = self._in_buffer
271
class SmartServerStreamMedium(object):
272
"""Handles smart commands coming over a stream.
274
The stream may be a pipe connected to sshd, or a tcp socket, or an
275
in-process fifo for testing.
277
One instance is created for each connected client; it can serve multiple
278
requests in the lifetime of the connection.
280
The server passes requests through to an underlying backing transport,
281
which will typically be a LocalTransport looking at the server's filesystem.
284
def __init__(self, backing_transport):
285
"""Construct new server.
287
:param backing_transport: Transport for the directory served.
289
# backing_transport could be passed to serve instead of __init__
290
self.backing_transport = backing_transport
291
self.finished = False
294
"""Serve requests until the client disconnects."""
295
# Keep a reference to stderr because the sys module's globals get set to
296
# None during interpreter shutdown.
297
from sys import stderr
299
while not self.finished:
300
protocol = SmartServerRequestProtocolOne(self.backing_transport,
302
self._serve_one_request(protocol)
304
stderr.write("%s terminating on exception %s\n" % (self, e))
307
def _serve_one_request(self, protocol):
308
"""Read one request from input, process, send back a response.
310
:param protocol: a SmartServerRequestProtocol.
313
self._serve_one_request_unguarded(protocol)
314
except KeyboardInterrupt:
317
self.terminate_due_to_error()
319
def terminate_due_to_error(self):
320
"""Called when an unhandled exception from the protocol occurs."""
321
raise NotImplementedError(self.terminate_due_to_error)
324
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
326
def __init__(self, sock, backing_transport):
329
:param sock: the socket the server will read from. It will be put
332
SmartServerStreamMedium.__init__(self, backing_transport)
334
sock.setblocking(True)
337
def _serve_one_request_unguarded(self, protocol):
338
while protocol.next_read_size():
340
protocol.accept_bytes(self.push_back)
343
bytes = self.socket.recv(4096)
347
protocol.accept_bytes(bytes)
349
self.push_back = protocol.excess_buffer
351
def terminate_due_to_error(self):
352
"""Called when an unhandled exception from the protocol occurs."""
353
# TODO: This should log to a server log file, but no such thing
354
# exists yet. Andrew Bennetts 2006-09-29.
358
def _write_out(self, bytes):
359
self.socket.sendall(bytes)
362
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
364
def __init__(self, in_file, out_file, backing_transport):
365
"""Construct new server.
367
:param in_file: Python file from which requests can be read.
368
:param out_file: Python file to write responses.
369
:param backing_transport: Transport for the directory served.
371
SmartServerStreamMedium.__init__(self, backing_transport)
372
if sys.platform == 'win32':
373
# force binary mode for files
375
for f in (in_file, out_file):
376
fileno = getattr(f, 'fileno', None)
378
msvcrt.setmode(fileno(), os.O_BINARY)
382
def _serve_one_request_unguarded(self, protocol):
384
bytes_to_read = protocol.next_read_size()
385
if bytes_to_read == 0:
386
# Finished serving this request.
389
bytes = self._in.read(bytes_to_read)
391
# Connection has been closed.
395
protocol.accept_bytes(bytes)
397
def terminate_due_to_error(self):
398
# TODO: This should log to a server log file, but no such thing
399
# exists yet. Andrew Bennetts 2006-09-29.
403
def _write_out(self, bytes):
404
self._out.write(bytes)
407
class SmartServerResponse(object):
408
"""Response generated by SmartServerRequestHandler."""
410
def __init__(self, args, body=None):
414
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
415
# for delivering the data for a request. This could be done with as the
416
# StreamServer, though that would create conflation between request and response
417
# which may be undesirable.
420
class SmartServerRequestHandler(object):
421
"""Protocol logic for smart server.
423
This doesn't handle serialization at all, it just processes requests and
427
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
428
# not contain encoding or decoding logic to allow the wire protocol to vary
429
# from the object protocol: we will want to tweak the wire protocol separate
430
# from the object model, and ideally we will be able to do that without
431
# having a SmartServerRequestHandler subclass for each wire protocol, rather
432
# just a Protocol subclass.
434
# TODO: Better way of representing the body for commands that take it,
435
# and allow it to be streamed into the server.
437
def __init__(self, backing_transport):
438
self._backing_transport = backing_transport
439
self._converted_command = False
440
self.finished_reading = False
441
self._body_bytes = ''
444
def accept_body(self, bytes):
447
This should be overriden for each command that desired body data to
448
handle the right format of that data. I.e. plain bytes, a bundle etc.
450
The deserialisation into that format should be done in the Protocol
451
object. Set self.desired_body_format to the format your method will
454
# default fallback is to accumulate bytes.
455
self._body_bytes += bytes
457
def _end_of_body_handler(self):
458
"""An unimplemented end of body handler."""
459
raise NotImplementedError(self._end_of_body_handler)
462
"""Answer a version request with my version."""
463
return SmartServerResponse(('ok', '1'))
465
def do_has(self, relpath):
466
r = self._backing_transport.has(relpath) and 'yes' or 'no'
467
return SmartServerResponse((r,))
469
def do_get(self, relpath):
470
backing_bytes = self._backing_transport.get_bytes(relpath)
471
return SmartServerResponse(('ok',), backing_bytes)
473
def _deserialise_optional_mode(self, mode):
474
# XXX: FIXME this should be on the protocol object.
480
def do_append(self, relpath, mode):
481
self._converted_command = True
482
self._relpath = relpath
483
self._mode = self._deserialise_optional_mode(mode)
484
self._end_of_body_handler = self._handle_do_append_end
486
def _handle_do_append_end(self):
487
old_length = self._backing_transport.append_bytes(
488
self._relpath, self._body_bytes, self._mode)
489
self.response = SmartServerResponse(('appended', '%d' % old_length))
491
def do_delete(self, relpath):
492
self._backing_transport.delete(relpath)
494
def do_iter_files_recursive(self, relpath):
495
transport = self._backing_transport.clone(relpath)
496
filenames = transport.iter_files_recursive()
497
return SmartServerResponse(('names',) + tuple(filenames))
499
def do_list_dir(self, relpath):
500
filenames = self._backing_transport.list_dir(relpath)
501
return SmartServerResponse(('names',) + tuple(filenames))
503
def do_mkdir(self, relpath, mode):
504
self._backing_transport.mkdir(relpath,
505
self._deserialise_optional_mode(mode))
507
def do_move(self, rel_from, rel_to):
508
self._backing_transport.move(rel_from, rel_to)
510
def do_put(self, relpath, mode):
511
self._converted_command = True
512
self._relpath = relpath
513
self._mode = self._deserialise_optional_mode(mode)
514
self._end_of_body_handler = self._handle_do_put
516
def _handle_do_put(self):
517
self._backing_transport.put_bytes(self._relpath,
518
self._body_bytes, self._mode)
519
self.response = SmartServerResponse(('ok',))
521
def _deserialise_offsets(self, text):
522
# XXX: FIXME this should be on the protocol object.
524
for line in text.split('\n'):
527
start, length = line.split(',')
528
offsets.append((int(start), int(length)))
531
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
532
self._converted_command = True
533
self._end_of_body_handler = self._handle_put_non_atomic
534
self._relpath = relpath
535
self._dir_mode = self._deserialise_optional_mode(dir_mode)
536
self._mode = self._deserialise_optional_mode(mode)
537
# a boolean would be nicer XXX
538
self._create_parent = (create_parent == 'T')
540
def _handle_put_non_atomic(self):
541
self._backing_transport.put_bytes_non_atomic(self._relpath,
544
create_parent_dir=self._create_parent,
545
dir_mode=self._dir_mode)
546
self.response = SmartServerResponse(('ok',))
548
def do_readv(self, relpath):
549
self._converted_command = True
550
self._end_of_body_handler = self._handle_readv_offsets
551
self._relpath = relpath
553
def end_of_body(self):
554
"""No more body data will be received."""
555
self._run_handler_code(self._end_of_body_handler, (), {})
556
# cannot read after this.
557
self.finished_reading = True
559
def _handle_readv_offsets(self):
560
"""accept offsets for a readv request."""
561
offsets = self._deserialise_offsets(self._body_bytes)
562
backing_bytes = ''.join(bytes for offset, bytes in
563
self._backing_transport.readv(self._relpath, offsets))
564
self.response = SmartServerResponse(('readv',), backing_bytes)
566
def do_rename(self, rel_from, rel_to):
567
self._backing_transport.rename(rel_from, rel_to)
569
def do_rmdir(self, relpath):
570
self._backing_transport.rmdir(relpath)
572
def do_stat(self, relpath):
573
stat = self._backing_transport.stat(relpath)
574
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
576
def do_get_bundle(self, path, revision_id):
577
# open transport relative to our base
578
t = self._backing_transport.clone(path)
579
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
580
repo = control.open_repository()
581
tmpf = tempfile.TemporaryFile()
582
base_revision = revision.NULL_REVISION
583
write_bundle(repo, revision_id, base_revision, tmpf)
585
return SmartServerResponse((), tmpf.read())
587
def dispatch_command(self, cmd, args):
588
"""Deprecated compatibility method.""" # XXX XXX
589
func = getattr(self, 'do_' + cmd, None)
591
raise errors.SmartProtocolError("bad request %r" % (cmd,))
592
self._run_handler_code(func, args, {})
594
def _run_handler_code(self, callable, args, kwargs):
595
"""Run some handler specific code 'callable'.
597
If a result is returned, it is considered to be the commands response,
598
and finished_reading is set true, and its assigned to self.response.
600
Any exceptions caught are translated and a response object created
603
result = self._call_converting_errors(callable, args, kwargs)
604
if result is not None:
605
self.response = result
606
self.finished_reading = True
607
# handle unconverted commands
608
if not self._converted_command:
609
self.finished_reading = True
611
self.response = SmartServerResponse(('ok',))
613
def _call_converting_errors(self, callable, args, kwargs):
614
"""Call callable converting errors to Response objects."""
616
return callable(*args, **kwargs)
617
except errors.NoSuchFile, e:
618
return SmartServerResponse(('NoSuchFile', e.path))
619
except errors.FileExists, e:
620
return SmartServerResponse(('FileExists', e.path))
621
except errors.DirectoryNotEmpty, e:
622
return SmartServerResponse(('DirectoryNotEmpty', e.path))
623
except errors.ShortReadvError, e:
624
return SmartServerResponse(('ShortReadvError',
625
e.path, str(e.offset), str(e.length), str(e.actual)))
626
except UnicodeError, e:
627
# If it is a DecodeError, than most likely we are starting
628
# with a plain string
629
str_or_unicode = e.object
630
if isinstance(str_or_unicode, unicode):
631
# XXX: UTF-8 might have \x01 (our seperator byte) in it. We
632
# should escape it somehow.
633
val = 'u:' + str_or_unicode.encode('utf-8')
635
val = 's:' + str_or_unicode.encode('base64')
636
# This handles UnicodeEncodeError or UnicodeDecodeError
637
return SmartServerResponse((e.__class__.__name__,
638
e.encoding, val, str(e.start), str(e.end), e.reason))
639
except errors.TransportNotPossible, e:
640
if e.msg == "readonly transport":
641
return SmartServerResponse(('ReadOnlyError', ))
646
class SmartTCPServer(object):
647
"""Listens on a TCP socket and accepts connections from smart clients"""
649
def __init__(self, backing_transport, host='127.0.0.1', port=0):
650
"""Construct a new server.
652
To actually start it running, call either start_background_thread or
655
:param host: Name of the interface to listen on.
656
:param port: TCP port to listen on, or 0 to allocate a transient port.
658
self._server_socket = socket.socket()
659
self._server_socket.bind((host, port))
660
self.port = self._server_socket.getsockname()[1]
661
self._server_socket.listen(1)
662
self._server_socket.settimeout(1)
663
self.backing_transport = backing_transport
666
# let connections timeout so that we get a chance to terminate
667
# Keep a reference to the exceptions we want to catch because the socket
668
# module's globals get set to None during interpreter shutdown.
669
from socket import timeout as socket_timeout
670
from socket import error as socket_error
671
self._should_terminate = False
672
while not self._should_terminate:
674
self.accept_and_serve()
675
except socket_timeout:
676
# just check if we're asked to stop
678
except socket_error, e:
679
trace.warning("client disconnected: %s", e)
683
"""Return the url of the server"""
684
return "bzr://%s:%d/" % self._server_socket.getsockname()
686
def accept_and_serve(self):
687
conn, client_addr = self._server_socket.accept()
688
# For WIN32, where the timeout value from the listening socket
689
# propogates to the newly accepted socket.
690
conn.setblocking(True)
691
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
692
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
693
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
694
connection_thread.setDaemon(True)
695
connection_thread.start()
697
def start_background_thread(self):
698
self._server_thread = threading.Thread(None,
700
name='server-' + self.get_url())
701
self._server_thread.setDaemon(True)
702
self._server_thread.start()
704
def stop_background_thread(self):
705
self._should_terminate = True
706
# At one point we would wait to join the threads here, but it looks
707
# like they don't actually exit. So now we just leave them running
708
# and expect to terminate the process. -- mbp 20070215
709
# self._server_socket.close()
710
## sys.stderr.write("waiting for server thread to finish...")
711
## self._server_thread.join()
714
class SmartTCPServer_for_testing(SmartTCPServer):
715
"""Server suitable for use by transport tests.
717
This server is backed by the process's cwd.
721
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
722
# The server is set up by default like for ssh access: the client
723
# passes filesystem-absolute paths; therefore the server must look
724
# them up relative to the root directory. it might be better to act
725
# a public server and have the server rewrite paths into the test
727
SmartTCPServer.__init__(self,
728
transport.get_transport(urlutils.local_path_to_url('/')))
730
def get_backing_transport(self, backing_transport_server):
731
"""Get a backing transport from a server we are decorating."""
732
return transport.get_transport(backing_transport_server.get_url())
734
def setUp(self, backing_transport_server=None):
735
"""Set up server for testing"""
736
from bzrlib.transport.chroot import TestingChrootServer
737
if backing_transport_server is None:
738
from bzrlib.transport.local import LocalURLServer
739
backing_transport_server = LocalURLServer()
740
self.chroot_server = TestingChrootServer()
741
self.chroot_server.setUp(backing_transport_server)
742
self.backing_transport = transport.get_transport(
743
self.chroot_server.get_url())
744
self.start_background_thread()
747
self.stop_background_thread()
749
def get_bogus_url(self):
750
"""Return a URL which will fail to connect"""
751
return 'bzr://127.0.0.1:1/'
754
class _SmartStat(object):
756
def __init__(self, size, mode):
761
class RemoteTransport(transport.Transport):
762
"""Connection to a smart server.
764
The connection holds references to the medium that can be used to send
765
requests to the server.
767
The connection has a notion of the current directory to which it's
768
connected; this is incorporated in filenames passed to the server.
770
This supports some higher-level RPC operations and can also be treated
771
like a Transport to do file-like operations.
773
The connection can be made over a tcp socket, an ssh pipe or a series of
774
http requests. There are concrete subclasses for each type:
775
SmartTCPTransport, etc.
778
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
779
# responsibilities: Put those on SmartClient or similar. This is vital for
780
# the ability to support multiple versions of the smart protocol over time:
781
# RemoteTransport is an adapter from the Transport object model to the
782
# SmartClient model, not an encoder.
784
def __init__(self, url, clone_from=None, medium=None):
787
:param medium: The medium to use for this RemoteTransport. This must be
788
supplied if clone_from is None.
790
### Technically super() here is faulty because Transport's __init__
791
### fails to take 2 parameters, and if super were to choose a silly
792
### initialisation order things would blow up.
793
if not url.endswith('/'):
795
super(RemoteTransport, self).__init__(url)
796
self._scheme, self._username, self._password, self._host, self._port, self._path = \
797
transport.split_url(url)
798
if clone_from is None:
799
self._medium = medium
801
# credentials may be stripped from the base in some circumstances
802
# as yet to be clearly defined or documented, so copy them.
803
self._username = clone_from._username
804
# reuse same connection
805
self._medium = clone_from._medium
806
assert self._medium is not None
808
def abspath(self, relpath):
809
"""Return the full url to the given relative path.
811
@param relpath: the relative path or path components
812
@type relpath: str or list
814
return self._unparse_url(self._remote_path(relpath))
816
def clone(self, relative_url):
817
"""Make a new RemoteTransport related to me, sharing the same connection.
819
This essentially opens a handle on a different remote directory.
821
if relative_url is None:
822
return RemoteTransport(self.base, self)
824
return RemoteTransport(self.abspath(relative_url), self)
826
def is_readonly(self):
827
"""Smart server transport can do read/write file operations."""
828
resp = self._call2('Transport.is_readonly')
829
if resp == ('yes', ):
831
elif resp == ('no', ):
834
self._translate_error(resp)
835
assert False, 'weird response %r' % (resp,)
837
def get_smart_client(self):
840
def get_smart_medium(self):
843
def _unparse_url(self, path):
844
"""Return URL for a path.
846
:see: SFTPUrlHandling._unparse_url
848
# TODO: Eventually it should be possible to unify this with
849
# SFTPUrlHandling._unparse_url?
852
path = urllib.quote(path)
853
netloc = urllib.quote(self._host)
854
if self._username is not None:
855
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
856
if self._port is not None:
857
netloc = '%s:%d' % (netloc, self._port)
858
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
860
def _remote_path(self, relpath):
861
"""Returns the Unicode version of the absolute path for relpath."""
862
return self._combine_paths(self._path, relpath)
864
def _call(self, method, *args):
865
resp = self._call2(method, *args)
866
self._translate_error(resp)
868
def _call2(self, method, *args):
869
"""Call a method on the remote server."""
870
return client.SmartClient(self._medium).call(method, *args)
872
def _call_with_body_bytes(self, method, args, body):
873
"""Call a method on the remote server with body bytes."""
874
smart_client = client.SmartClient(self._medium)
875
return smart_client.call_with_body_bytes(method, args, body)
877
def has(self, relpath):
878
"""Indicate whether a remote file of the given name exists or not.
880
:see: Transport.has()
882
resp = self._call2('has', self._remote_path(relpath))
883
if resp == ('yes', ):
885
elif resp == ('no', ):
888
self._translate_error(resp)
890
def get(self, relpath):
891
"""Return file-like object reading the contents of a remote file.
893
:see: Transport.get_bytes()/get_file()
895
return StringIO(self.get_bytes(relpath))
897
def get_bytes(self, relpath):
898
remote = self._remote_path(relpath)
899
request = self._medium.get_request()
900
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
901
smart_protocol.call('get', remote)
902
resp = smart_protocol.read_response_tuple(True)
904
smart_protocol.cancel_read_body()
905
self._translate_error(resp, relpath)
906
return smart_protocol.read_body_bytes()
908
def _serialise_optional_mode(self, mode):
914
def mkdir(self, relpath, mode=None):
915
resp = self._call2('mkdir', self._remote_path(relpath),
916
self._serialise_optional_mode(mode))
917
self._translate_error(resp)
919
def put_bytes(self, relpath, upload_contents, mode=None):
920
# FIXME: upload_file is probably not safe for non-ascii characters -
921
# should probably just pass all parameters as length-delimited
923
resp = self._call_with_body_bytes('put',
924
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
926
self._translate_error(resp)
928
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
929
create_parent_dir=False,
931
"""See Transport.put_bytes_non_atomic."""
932
# FIXME: no encoding in the transport!
933
create_parent_str = 'F'
934
if create_parent_dir:
935
create_parent_str = 'T'
937
resp = self._call_with_body_bytes(
939
(self._remote_path(relpath), self._serialise_optional_mode(mode),
940
create_parent_str, self._serialise_optional_mode(dir_mode)),
942
self._translate_error(resp)
944
def put_file(self, relpath, upload_file, mode=None):
945
# its not ideal to seek back, but currently put_non_atomic_file depends
946
# on transports not reading before failing - which is a faulty
947
# assumption I think - RBC 20060915
948
pos = upload_file.tell()
950
return self.put_bytes(relpath, upload_file.read(), mode)
952
upload_file.seek(pos)
955
def put_file_non_atomic(self, relpath, f, mode=None,
956
create_parent_dir=False,
958
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
959
create_parent_dir=create_parent_dir,
962
def append_file(self, relpath, from_file, mode=None):
963
return self.append_bytes(relpath, from_file.read(), mode)
965
def append_bytes(self, relpath, bytes, mode=None):
966
resp = self._call_with_body_bytes(
968
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
970
if resp[0] == 'appended':
972
self._translate_error(resp)
974
def delete(self, relpath):
975
resp = self._call2('delete', self._remote_path(relpath))
976
self._translate_error(resp)
978
def readv(self, relpath, offsets):
982
offsets = list(offsets)
984
sorted_offsets = sorted(offsets)
985
# turn the list of offsets into a stack
986
offset_stack = iter(offsets)
987
cur_offset_and_size = offset_stack.next()
988
coalesced = list(self._coalesce_offsets(sorted_offsets,
989
limit=self._max_readv_combine,
990
fudge_factor=self._bytes_to_read_before_seek))
992
request = self._medium.get_request()
993
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
994
smart_protocol.call_with_body_readv_array(
995
('readv', self._remote_path(relpath)),
996
[(c.start, c.length) for c in coalesced])
997
resp = smart_protocol.read_response_tuple(True)
999
if resp[0] != 'readv':
1000
# This should raise an exception
1001
smart_protocol.cancel_read_body()
1002
self._translate_error(resp)
1005
# FIXME: this should know how many bytes are needed, for clarity.
1006
data = smart_protocol.read_body_bytes()
1007
# Cache the results, but only until they have been fulfilled
1009
for c_offset in coalesced:
1010
if len(data) < c_offset.length:
1011
raise errors.ShortReadvError(relpath, c_offset.start,
1012
c_offset.length, actual=len(data))
1013
for suboffset, subsize in c_offset.ranges:
1014
key = (c_offset.start+suboffset, subsize)
1015
data_map[key] = data[suboffset:suboffset+subsize]
1016
data = data[c_offset.length:]
1018
# Now that we've read some data, see if we can yield anything back
1019
while cur_offset_and_size in data_map:
1020
this_data = data_map.pop(cur_offset_and_size)
1021
yield cur_offset_and_size[0], this_data
1022
cur_offset_and_size = offset_stack.next()
1024
def rename(self, rel_from, rel_to):
1025
self._call('rename',
1026
self._remote_path(rel_from),
1027
self._remote_path(rel_to))
1029
def move(self, rel_from, rel_to):
1031
self._remote_path(rel_from),
1032
self._remote_path(rel_to))
1034
def rmdir(self, relpath):
1035
resp = self._call('rmdir', self._remote_path(relpath))
1037
def _translate_error(self, resp, orig_path=None):
1038
"""Raise an exception from a response"""
1045
elif what == 'NoSuchFile':
1046
if orig_path is not None:
1047
error_path = orig_path
1049
error_path = resp[1]
1050
raise errors.NoSuchFile(error_path)
1051
elif what == 'error':
1052
raise errors.SmartProtocolError(unicode(resp[1]))
1053
elif what == 'FileExists':
1054
raise errors.FileExists(resp[1])
1055
elif what == 'DirectoryNotEmpty':
1056
raise errors.DirectoryNotEmpty(resp[1])
1057
elif what == 'ShortReadvError':
1058
raise errors.ShortReadvError(resp[1], int(resp[2]),
1059
int(resp[3]), int(resp[4]))
1060
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1061
encoding = str(resp[1]) # encoding must always be a string
1063
start = int(resp[3])
1065
reason = str(resp[5]) # reason must always be a string
1066
if val.startswith('u:'):
1067
val = val[2:].decode('utf-8')
1068
elif val.startswith('s:'):
1069
val = val[2:].decode('base64')
1070
if what == 'UnicodeDecodeError':
1071
raise UnicodeDecodeError(encoding, val, start, end, reason)
1072
elif what == 'UnicodeEncodeError':
1073
raise UnicodeEncodeError(encoding, val, start, end, reason)
1074
elif what == "ReadOnlyError":
1075
raise errors.TransportNotPossible('readonly transport')
1077
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1079
def disconnect(self):
1080
self._medium.disconnect()
1082
def delete_tree(self, relpath):
1083
raise errors.TransportNotPossible('readonly transport')
1085
def stat(self, relpath):
1086
resp = self._call2('stat', self._remote_path(relpath))
1087
if resp[0] == 'stat':
1088
return _SmartStat(int(resp[1]), int(resp[2], 8))
1090
self._translate_error(resp)
1092
## def lock_read(self, relpath):
1093
## """Lock the given file for shared (read) access.
1094
## :return: A lock object, which should be passed to Transport.unlock()
1096
## # The old RemoteBranch ignore lock for reading, so we will
1097
## # continue that tradition and return a bogus lock object.
1098
## class BogusLock(object):
1099
## def __init__(self, path):
1101
## def unlock(self):
1103
## return BogusLock(relpath)
1108
def list_dir(self, relpath):
1109
resp = self._call2('list_dir', self._remote_path(relpath))
1110
if resp[0] == 'names':
1111
return [name.encode('ascii') for name in resp[1:]]
1113
self._translate_error(resp)
1115
def iter_files_recursive(self):
1116
resp = self._call2('iter_files_recursive', self._remote_path(''))
1117
if resp[0] == 'names':
1120
self._translate_error(resp)
1123
class SmartTCPTransport(RemoteTransport):
1124
"""Connection to smart server over plain tcp.
1126
This is essentially just a factory to get 'RemoteTransport(url,
1127
SmartTCPClientMedium).
1130
def __init__(self, url):
1131
_scheme, _username, _password, _host, _port, _path = \
1132
transport.split_url(url)
1134
_port = BZR_DEFAULT_PORT
1138
except (ValueError, TypeError), e:
1139
raise errors.InvalidURL(
1140
path=url, extra="invalid port %s" % _port)
1141
client_medium = medium.SmartTCPClientMedium(_host, _port)
1142
super(SmartTCPTransport, self).__init__(url, medium=client_medium)
1145
class SmartSSHTransport(RemoteTransport):
1146
"""Connection to smart server over SSH.
1148
This is essentially just a factory to get 'RemoteTransport(url,
1149
SmartSSHClientMedium).
1152
def __init__(self, url):
1153
_scheme, _username, _password, _host, _port, _path = \
1154
transport.split_url(url)
1156
if _port is not None:
1158
except (ValueError, TypeError), e:
1159
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1161
client_medium = medium.SmartSSHClientMedium(_host, _port,
1162
_username, _password)
1163
super(SmartSSHTransport, self).__init__(url, medium=client_medium)
1166
class SmartHTTPTransport(RemoteTransport):
1167
"""Just a way to connect between a bzr+http:// url and http://.
1169
This connection operates slightly differently than the SmartSSHTransport.
1170
It uses a plain http:// transport underneath, which defines what remote
1171
.bzr/smart URL we are connected to. From there, all paths that are sent are
1172
sent as relative paths, this way, the remote side can properly
1173
de-reference them, since it is likely doing rewrite rules to translate an
1174
HTTP path into a local path.
1177
def __init__(self, url, http_transport=None):
1178
assert url.startswith('bzr+http://')
1180
if http_transport is None:
1181
http_url = url[len('bzr+'):]
1182
self._http_transport = transport.get_transport(http_url)
1184
self._http_transport = http_transport
1185
http_medium = self._http_transport.get_smart_medium()
1186
super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
1188
def _remote_path(self, relpath):
1189
"""After connecting HTTP Transport only deals in relative URLs."""
1195
def abspath(self, relpath):
1196
"""Return the full url to the given relative path.
1198
:param relpath: the relative path or path components
1199
:type relpath: str or list
1201
return self._unparse_url(self._combine_paths(self._path, relpath))
1203
def clone(self, relative_url):
1204
"""Make a new SmartHTTPTransport related to me.
1206
This is re-implemented rather than using the default
1207
SmartTransport.clone() because we must be careful about the underlying
1211
abs_url = self.abspath(relative_url)
1214
# By cloning the underlying http_transport, we are able to share the
1216
new_transport = self._http_transport.clone(relative_url)
1217
return SmartHTTPTransport(abs_url, http_transport=new_transport)
1220
def get_test_permutations():
1221
"""Return (transport, server) permutations for testing."""
1222
### We may need a little more test framework support to construct an
1223
### appropriate RemoteTransport in the future.
1224
from bzrlib.smart import server
1225
return [(SmartTCPTransport, server.SmartTCPServer_for_testing)]