20
20
imported from bzrlib.smart.
23
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
23
__all__ = ['RemoteTransport', 'SmartTCPTransport', 'SmartSSHTransport']
25
25
from cStringIO import StringIO
27
34
from bzrlib import (
36
from bzrlib.smart import client, medium
37
from bzrlib.symbol_versioning import (
42
from bzrlib.bundle.serializer import write_bundle
43
from bzrlib.smart import client, medium, protocol
45
# must do this otherwise urllib can't parse the urls properly :(
46
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
47
transport.register_urlparse_netloc_protocol(scheme)
51
# Port 4155 is the default port for bzr://, registered with IANA.
52
BZR_DEFAULT_PORT = 4155
55
def _recv_tuple(from_file):
56
req_line = from_file.readline()
57
return _decode_tuple(req_line)
60
def _decode_tuple(req_line):
61
if req_line == None or req_line == '':
63
if req_line[-1] != '\n':
64
raise errors.SmartProtocolError("request %r not terminated" % req_line)
65
return tuple(req_line[:-1].split('\x01'))
68
def _encode_tuple(args):
69
"""Encode the tuple args to a bytestream."""
70
return '\x01'.join(args) + '\n'
73
class SmartProtocolBase(object):
74
"""Methods common to client and server"""
76
# TODO: this only actually accomodates a single block; possibly should
77
# support multiple chunks?
78
def _encode_bulk_data(self, body):
79
"""Encode body as a bulk data chunk."""
80
return ''.join(('%d\n' % len(body), body, 'done\n'))
82
def _serialise_offsets(self, offsets):
83
"""Serialise a readv offset list."""
85
for start, length in offsets:
86
txt.append('%d,%d' % (start, length))
90
class SmartServerRequestProtocolOne(SmartProtocolBase):
91
"""Server-side encoding and decoding logic for smart version 1."""
93
def __init__(self, backing_transport, write_func):
94
self._backing_transport = backing_transport
95
self.excess_buffer = ''
96
self._finished = False
98
self.has_dispatched = False
100
self._body_decoder = None
101
self._write_func = write_func
103
def accept_bytes(self, bytes):
104
"""Take bytes, and advance the internal state machine appropriately.
106
:param bytes: must be a byte string
108
assert isinstance(bytes, str)
109
self.in_buffer += bytes
110
if not self.has_dispatched:
111
if '\n' not in self.in_buffer:
112
# no command line yet
114
self.has_dispatched = True
116
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
118
req_args = _decode_tuple(first_line)
119
self.request = SmartServerRequestHandler(
120
self._backing_transport)
121
self.request.dispatch_command(req_args[0], req_args[1:])
122
if self.request.finished_reading:
124
self.excess_buffer = self.in_buffer
126
self._send_response(self.request.response.args,
127
self.request.response.body)
128
except KeyboardInterrupt:
130
except Exception, exception:
131
# everything else: pass to client, flush, and quit
132
self._send_response(('error', str(exception)))
135
if self.has_dispatched:
137
# nothing to do.XXX: this routine should be a single state
139
self.excess_buffer += self.in_buffer
142
if self._body_decoder is None:
143
self._body_decoder = LengthPrefixedBodyDecoder()
144
self._body_decoder.accept_bytes(self.in_buffer)
145
self.in_buffer = self._body_decoder.unused_data
146
body_data = self._body_decoder.read_pending_data()
147
self.request.accept_body(body_data)
148
if self._body_decoder.finished_reading:
149
self.request.end_of_body()
150
assert self.request.finished_reading, \
151
"no more body, request not finished"
152
if self.request.response is not None:
153
self._send_response(self.request.response.args,
154
self.request.response.body)
155
self.excess_buffer = self.in_buffer
158
assert not self.request.finished_reading, \
159
"no response and we have finished reading."
161
def _send_response(self, args, body=None):
162
"""Send a smart server response down the output stream."""
163
assert not self._finished, 'response already sent'
164
self._finished = True
165
self._write_func(_encode_tuple(args))
167
assert isinstance(body, str), 'body must be a str'
168
bytes = self._encode_bulk_data(body)
169
self._write_func(bytes)
171
def next_read_size(self):
174
if self._body_decoder is None:
177
return self._body_decoder.next_read_size()
180
class LengthPrefixedBodyDecoder(object):
181
"""Decodes the length-prefixed bulk data."""
184
self.bytes_left = None
185
self.finished_reading = False
186
self.unused_data = ''
187
self.state_accept = self._state_accept_expecting_length
188
self.state_read = self._state_read_no_data
190
self._trailer_buffer = ''
192
def accept_bytes(self, bytes):
193
"""Decode as much of bytes as possible.
195
If 'bytes' contains too much data it will be appended to
198
finished_reading will be set when no more data is required. Further
199
data will be appended to self.unused_data.
201
# accept_bytes is allowed to change the state
202
current_state = self.state_accept
203
self.state_accept(bytes)
204
while current_state != self.state_accept:
205
current_state = self.state_accept
206
self.state_accept('')
208
def next_read_size(self):
209
if self.bytes_left is not None:
210
# Ideally we want to read all the remainder of the body and the
212
return self.bytes_left + 5
213
elif self.state_accept == self._state_accept_reading_trailer:
214
# Just the trailer left
215
return 5 - len(self._trailer_buffer)
216
elif self.state_accept == self._state_accept_expecting_length:
217
# There's still at least 6 bytes left ('\n' to end the length, plus
221
# Reading excess data. Either way, 1 byte at a time is fine.
224
def read_pending_data(self):
225
"""Return any pending data that has been decoded."""
226
return self.state_read()
228
def _state_accept_expecting_length(self, bytes):
229
self._in_buffer += bytes
230
pos = self._in_buffer.find('\n')
233
self.bytes_left = int(self._in_buffer[:pos])
234
self._in_buffer = self._in_buffer[pos+1:]
235
self.bytes_left -= len(self._in_buffer)
236
self.state_accept = self._state_accept_reading_body
237
self.state_read = self._state_read_in_buffer
239
def _state_accept_reading_body(self, bytes):
240
self._in_buffer += bytes
241
self.bytes_left -= len(bytes)
242
if self.bytes_left <= 0:
244
if self.bytes_left != 0:
245
self._trailer_buffer = self._in_buffer[self.bytes_left:]
246
self._in_buffer = self._in_buffer[:self.bytes_left]
247
self.bytes_left = None
248
self.state_accept = self._state_accept_reading_trailer
250
def _state_accept_reading_trailer(self, bytes):
251
self._trailer_buffer += bytes
252
# TODO: what if the trailer does not match "done\n"? Should this raise
253
# a ProtocolViolation exception?
254
if self._trailer_buffer.startswith('done\n'):
255
self.unused_data = self._trailer_buffer[len('done\n'):]
256
self.state_accept = self._state_accept_reading_unused
257
self.finished_reading = True
259
def _state_accept_reading_unused(self, bytes):
260
self.unused_data += bytes
262
def _state_read_no_data(self):
265
def _state_read_in_buffer(self):
266
result = self._in_buffer
271
class SmartServerStreamMedium(object):
272
"""Handles smart commands coming over a stream.
274
The stream may be a pipe connected to sshd, or a tcp socket, or an
275
in-process fifo for testing.
277
One instance is created for each connected client; it can serve multiple
278
requests in the lifetime of the connection.
280
The server passes requests through to an underlying backing transport,
281
which will typically be a LocalTransport looking at the server's filesystem.
284
def __init__(self, backing_transport):
285
"""Construct new server.
287
:param backing_transport: Transport for the directory served.
289
# backing_transport could be passed to serve instead of __init__
290
self.backing_transport = backing_transport
291
self.finished = False
294
"""Serve requests until the client disconnects."""
295
# Keep a reference to stderr because the sys module's globals get set to
296
# None during interpreter shutdown.
297
from sys import stderr
299
while not self.finished:
300
protocol = SmartServerRequestProtocolOne(self.backing_transport,
302
self._serve_one_request(protocol)
304
stderr.write("%s terminating on exception %s\n" % (self, e))
307
def _serve_one_request(self, protocol):
308
"""Read one request from input, process, send back a response.
310
:param protocol: a SmartServerRequestProtocol.
313
self._serve_one_request_unguarded(protocol)
314
except KeyboardInterrupt:
317
self.terminate_due_to_error()
319
def terminate_due_to_error(self):
320
"""Called when an unhandled exception from the protocol occurs."""
321
raise NotImplementedError(self.terminate_due_to_error)
324
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
326
def __init__(self, sock, backing_transport):
329
:param sock: the socket the server will read from. It will be put
332
SmartServerStreamMedium.__init__(self, backing_transport)
334
sock.setblocking(True)
337
def _serve_one_request_unguarded(self, protocol):
338
while protocol.next_read_size():
340
protocol.accept_bytes(self.push_back)
343
bytes = self.socket.recv(4096)
347
protocol.accept_bytes(bytes)
349
self.push_back = protocol.excess_buffer
351
def terminate_due_to_error(self):
352
"""Called when an unhandled exception from the protocol occurs."""
353
# TODO: This should log to a server log file, but no such thing
354
# exists yet. Andrew Bennetts 2006-09-29.
358
def _write_out(self, bytes):
359
self.socket.sendall(bytes)
362
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
364
def __init__(self, in_file, out_file, backing_transport):
365
"""Construct new server.
367
:param in_file: Python file from which requests can be read.
368
:param out_file: Python file to write responses.
369
:param backing_transport: Transport for the directory served.
371
SmartServerStreamMedium.__init__(self, backing_transport)
372
if sys.platform == 'win32':
373
# force binary mode for files
375
for f in (in_file, out_file):
376
fileno = getattr(f, 'fileno', None)
378
msvcrt.setmode(fileno(), os.O_BINARY)
382
def _serve_one_request_unguarded(self, protocol):
384
bytes_to_read = protocol.next_read_size()
385
if bytes_to_read == 0:
386
# Finished serving this request.
389
bytes = self._in.read(bytes_to_read)
391
# Connection has been closed.
395
protocol.accept_bytes(bytes)
397
def terminate_due_to_error(self):
398
# TODO: This should log to a server log file, but no such thing
399
# exists yet. Andrew Bennetts 2006-09-29.
403
def _write_out(self, bytes):
404
self._out.write(bytes)
407
class SmartServerResponse(object):
408
"""Response generated by SmartServerRequestHandler."""
410
def __init__(self, args, body=None):
414
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
415
# for delivering the data for a request. This could be done with as the
416
# StreamServer, though that would create conflation between request and response
417
# which may be undesirable.
420
class SmartServerRequestHandler(object):
421
"""Protocol logic for smart server.
423
This doesn't handle serialization at all, it just processes requests and
427
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
428
# not contain encoding or decoding logic to allow the wire protocol to vary
429
# from the object protocol: we will want to tweak the wire protocol separate
430
# from the object model, and ideally we will be able to do that without
431
# having a SmartServerRequestHandler subclass for each wire protocol, rather
432
# just a Protocol subclass.
434
# TODO: Better way of representing the body for commands that take it,
435
# and allow it to be streamed into the server.
437
def __init__(self, backing_transport):
438
self._backing_transport = backing_transport
439
self._converted_command = False
440
self.finished_reading = False
441
self._body_bytes = ''
444
def accept_body(self, bytes):
447
This should be overriden for each command that desired body data to
448
handle the right format of that data. I.e. plain bytes, a bundle etc.
450
The deserialisation into that format should be done in the Protocol
451
object. Set self.desired_body_format to the format your method will
454
# default fallback is to accumulate bytes.
455
self._body_bytes += bytes
457
def _end_of_body_handler(self):
458
"""An unimplemented end of body handler."""
459
raise NotImplementedError(self._end_of_body_handler)
462
"""Answer a version request with my version."""
463
return SmartServerResponse(('ok', '1'))
465
def do_has(self, relpath):
466
r = self._backing_transport.has(relpath) and 'yes' or 'no'
467
return SmartServerResponse((r,))
469
def do_get(self, relpath):
470
backing_bytes = self._backing_transport.get_bytes(relpath)
471
return SmartServerResponse(('ok',), backing_bytes)
473
def _deserialise_optional_mode(self, mode):
474
# XXX: FIXME this should be on the protocol object.
480
def do_append(self, relpath, mode):
481
self._converted_command = True
482
self._relpath = relpath
483
self._mode = self._deserialise_optional_mode(mode)
484
self._end_of_body_handler = self._handle_do_append_end
486
def _handle_do_append_end(self):
487
old_length = self._backing_transport.append_bytes(
488
self._relpath, self._body_bytes, self._mode)
489
self.response = SmartServerResponse(('appended', '%d' % old_length))
491
def do_delete(self, relpath):
492
self._backing_transport.delete(relpath)
494
def do_iter_files_recursive(self, relpath):
495
transport = self._backing_transport.clone(relpath)
496
filenames = transport.iter_files_recursive()
497
return SmartServerResponse(('names',) + tuple(filenames))
499
def do_list_dir(self, relpath):
500
filenames = self._backing_transport.list_dir(relpath)
501
return SmartServerResponse(('names',) + tuple(filenames))
503
def do_mkdir(self, relpath, mode):
504
self._backing_transport.mkdir(relpath,
505
self._deserialise_optional_mode(mode))
507
def do_move(self, rel_from, rel_to):
508
self._backing_transport.move(rel_from, rel_to)
510
def do_put(self, relpath, mode):
511
self._converted_command = True
512
self._relpath = relpath
513
self._mode = self._deserialise_optional_mode(mode)
514
self._end_of_body_handler = self._handle_do_put
516
def _handle_do_put(self):
517
self._backing_transport.put_bytes(self._relpath,
518
self._body_bytes, self._mode)
519
self.response = SmartServerResponse(('ok',))
521
def _deserialise_offsets(self, text):
522
# XXX: FIXME this should be on the protocol object.
524
for line in text.split('\n'):
527
start, length = line.split(',')
528
offsets.append((int(start), int(length)))
531
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
532
self._converted_command = True
533
self._end_of_body_handler = self._handle_put_non_atomic
534
self._relpath = relpath
535
self._dir_mode = self._deserialise_optional_mode(dir_mode)
536
self._mode = self._deserialise_optional_mode(mode)
537
# a boolean would be nicer XXX
538
self._create_parent = (create_parent == 'T')
540
def _handle_put_non_atomic(self):
541
self._backing_transport.put_bytes_non_atomic(self._relpath,
544
create_parent_dir=self._create_parent,
545
dir_mode=self._dir_mode)
546
self.response = SmartServerResponse(('ok',))
548
def do_readv(self, relpath):
549
self._converted_command = True
550
self._end_of_body_handler = self._handle_readv_offsets
551
self._relpath = relpath
553
def end_of_body(self):
554
"""No more body data will be received."""
555
self._run_handler_code(self._end_of_body_handler, (), {})
556
# cannot read after this.
557
self.finished_reading = True
559
def _handle_readv_offsets(self):
560
"""accept offsets for a readv request."""
561
offsets = self._deserialise_offsets(self._body_bytes)
562
backing_bytes = ''.join(bytes for offset, bytes in
563
self._backing_transport.readv(self._relpath, offsets))
564
self.response = SmartServerResponse(('readv',), backing_bytes)
566
def do_rename(self, rel_from, rel_to):
567
self._backing_transport.rename(rel_from, rel_to)
569
def do_rmdir(self, relpath):
570
self._backing_transport.rmdir(relpath)
572
def do_stat(self, relpath):
573
stat = self._backing_transport.stat(relpath)
574
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
576
def do_get_bundle(self, path, revision_id):
577
# open transport relative to our base
578
t = self._backing_transport.clone(path)
579
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
580
repo = control.open_repository()
581
tmpf = tempfile.TemporaryFile()
582
base_revision = revision.NULL_REVISION
583
write_bundle(repo, revision_id, base_revision, tmpf)
585
return SmartServerResponse((), tmpf.read())
587
def dispatch_command(self, cmd, args):
588
"""Deprecated compatibility method.""" # XXX XXX
589
func = getattr(self, 'do_' + cmd, None)
591
raise errors.SmartProtocolError("bad request %r" % (cmd,))
592
self._run_handler_code(func, args, {})
594
def _run_handler_code(self, callable, args, kwargs):
595
"""Run some handler specific code 'callable'.
597
If a result is returned, it is considered to be the commands response,
598
and finished_reading is set true, and its assigned to self.response.
600
Any exceptions caught are translated and a response object created
603
result = self._call_converting_errors(callable, args, kwargs)
604
if result is not None:
605
self.response = result
606
self.finished_reading = True
607
# handle unconverted commands
608
if not self._converted_command:
609
self.finished_reading = True
611
self.response = SmartServerResponse(('ok',))
613
def _call_converting_errors(self, callable, args, kwargs):
614
"""Call callable converting errors to Response objects."""
616
return callable(*args, **kwargs)
617
except errors.NoSuchFile, e:
618
return SmartServerResponse(('NoSuchFile', e.path))
619
except errors.FileExists, e:
620
return SmartServerResponse(('FileExists', e.path))
621
except errors.DirectoryNotEmpty, e:
622
return SmartServerResponse(('DirectoryNotEmpty', e.path))
623
except errors.ShortReadvError, e:
624
return SmartServerResponse(('ShortReadvError',
625
e.path, str(e.offset), str(e.length), str(e.actual)))
626
except UnicodeError, e:
627
# If it is a DecodeError, than most likely we are starting
628
# with a plain string
629
str_or_unicode = e.object
630
if isinstance(str_or_unicode, unicode):
631
# XXX: UTF-8 might have \x01 (our seperator byte) in it. We
632
# should escape it somehow.
633
val = 'u:' + str_or_unicode.encode('utf-8')
635
val = 's:' + str_or_unicode.encode('base64')
636
# This handles UnicodeEncodeError or UnicodeDecodeError
637
return SmartServerResponse((e.__class__.__name__,
638
e.encoding, val, str(e.start), str(e.end), e.reason))
639
except errors.TransportNotPossible, e:
640
if e.msg == "readonly transport":
641
return SmartServerResponse(('ReadOnlyError', ))
646
class SmartTCPServer(object):
647
"""Listens on a TCP socket and accepts connections from smart clients"""
649
def __init__(self, backing_transport, host='127.0.0.1', port=0):
650
"""Construct a new server.
652
To actually start it running, call either start_background_thread or
655
:param host: Name of the interface to listen on.
656
:param port: TCP port to listen on, or 0 to allocate a transient port.
658
self._server_socket = socket.socket()
659
self._server_socket.bind((host, port))
660
self.port = self._server_socket.getsockname()[1]
661
self._server_socket.listen(1)
662
self._server_socket.settimeout(1)
663
self.backing_transport = backing_transport
666
# let connections timeout so that we get a chance to terminate
667
# Keep a reference to the exceptions we want to catch because the socket
668
# module's globals get set to None during interpreter shutdown.
669
from socket import timeout as socket_timeout
670
from socket import error as socket_error
671
self._should_terminate = False
672
while not self._should_terminate:
674
self.accept_and_serve()
675
except socket_timeout:
676
# just check if we're asked to stop
678
except socket_error, e:
679
trace.warning("client disconnected: %s", e)
683
"""Return the url of the server"""
684
return "bzr://%s:%d/" % self._server_socket.getsockname()
686
def accept_and_serve(self):
687
conn, client_addr = self._server_socket.accept()
688
# For WIN32, where the timeout value from the listening socket
689
# propogates to the newly accepted socket.
690
conn.setblocking(True)
691
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
692
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
693
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
694
connection_thread.setDaemon(True)
695
connection_thread.start()
697
def start_background_thread(self):
698
self._server_thread = threading.Thread(None,
700
name='server-' + self.get_url())
701
self._server_thread.setDaemon(True)
702
self._server_thread.start()
704
def stop_background_thread(self):
705
self._should_terminate = True
706
# At one point we would wait to join the threads here, but it looks
707
# like they don't actually exit. So now we just leave them running
708
# and expect to terminate the process. -- mbp 20070215
709
# self._server_socket.close()
710
## sys.stderr.write("waiting for server thread to finish...")
711
## self._server_thread.join()
714
class SmartTCPServer_for_testing(SmartTCPServer):
715
"""Server suitable for use by transport tests.
717
This server is backed by the process's cwd.
721
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
722
# The server is set up by default like for ssh access: the client
723
# passes filesystem-absolute paths; therefore the server must look
724
# them up relative to the root directory. it might be better to act
725
# a public server and have the server rewrite paths into the test
727
SmartTCPServer.__init__(self,
728
transport.get_transport(urlutils.local_path_to_url('/')))
730
def get_backing_transport(self, backing_transport_server):
731
"""Get a backing transport from a server we are decorating."""
732
return transport.get_transport(backing_transport_server.get_url())
734
def setUp(self, backing_transport_server=None):
735
"""Set up server for testing"""
736
from bzrlib.transport.chroot import TestingChrootServer
737
if backing_transport_server is None:
738
from bzrlib.transport.local import LocalURLServer
739
backing_transport_server = LocalURLServer()
740
self.chroot_server = TestingChrootServer()
741
self.chroot_server.setUp(backing_transport_server)
742
self.backing_transport = transport.get_transport(
743
self.chroot_server.get_url())
744
self.start_background_thread()
747
self.stop_background_thread()
749
def get_bogus_url(self):
750
"""Return a URL which will fail to connect"""
751
return 'bzr://127.0.0.1:1/'
42
754
class _SmartStat(object):
55
767
The connection has a notion of the current directory to which it's
56
768
connected; this is incorporated in filenames passed to the server.
58
This supports some higher-level RPC operations and can also be treated
770
This supports some higher-level RPC operations and can also be treated
59
771
like a Transport to do file-like operations.
61
773
The connection can be made over a tcp socket, an ssh pipe or a series of
62
774
http requests. There are concrete subclasses for each type:
63
RemoteTCPTransport, etc.
775
SmartTCPTransport, etc.
66
# When making a readv request, cap it at requesting 5MB of data
67
_max_readv_bytes = 5*1024*1024
69
778
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
70
779
# responsibilities: Put those on SmartClient or similar. This is vital for
71
780
# the ability to support multiple versions of the smart protocol over time:
72
# RemoteTransport is an adapter from the Transport object model to the
781
# RemoteTransport is an adapter from the Transport object model to the
73
782
# SmartClient model, not an encoder.
75
# FIXME: the medium parameter should be private, only the tests requires
76
# it. It may be even clearer to define a TestRemoteTransport that handles
77
# the specific cases of providing a _client and/or a _medium, and leave
78
# RemoteTransport as an abstract class.
79
def __init__(self, url, _from_transport=None, medium=None, _client=None):
784
def __init__(self, url, clone_from=None, medium=None):
82
:param _from_transport: Another RemoteTransport instance that this
83
one is being cloned from. Attributes such as the medium will
86
:param medium: The medium to use for this RemoteTransport. If None,
87
the medium from the _from_transport is shared. If both this
88
and _from_transport are None, a new medium will be built.
89
_from_transport and medium cannot both be specified.
91
:param _client: Override the _SmartClient used by this transport. This
92
should only be used for testing purposes; normally this is
93
determined from the medium.
95
super(RemoteTransport, self).__init__(
96
url, _from_transport=_from_transport)
98
# The medium is the connection, except when we need to share it with
99
# other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
# what we want to share is really the shared connection.
102
if (_from_transport is not None
103
and isinstance(_from_transport, RemoteTransport)):
104
_client = _from_transport._client
105
elif _from_transport is None:
106
# If no _from_transport is specified, we need to intialize the
110
medium, credentials = self._build_medium()
111
if 'hpss' in debug.debug_flags:
112
trace.mutter('hpss: Built a new medium: %s',
113
medium.__class__.__name__)
114
self._shared_connection = transport._SharedConnection(medium,
118
# No medium was specified, so share the medium from the
120
medium = self._shared_connection.connection
122
raise AssertionError(
123
"Both _from_transport (%r) and medium (%r) passed to "
124
"RemoteTransport.__init__, but these parameters are mutally "
125
"exclusive." % (_from_transport, medium))
128
self._client = client._SmartClient(medium)
130
self._client = _client
132
def _build_medium(self):
133
"""Create the medium if _from_transport does not provide one.
135
The medium is analogous to the connection for ConnectedTransport: it
136
allows connection sharing.
141
def _report_activity(self, bytes, direction):
142
"""See Transport._report_activity.
144
Does nothing; the smart medium will report activity triggered by a
787
:param medium: The medium to use for this RemoteTransport. This must be
788
supplied if clone_from is None.
790
### Technically super() here is faulty because Transport's __init__
791
### fails to take 2 parameters, and if super were to choose a silly
792
### initialisation order things would blow up.
793
if not url.endswith('/'):
795
super(RemoteTransport, self).__init__(url)
796
self._scheme, self._username, self._password, self._host, self._port, self._path = \
797
transport.split_url(url)
798
if clone_from is None:
799
self._medium = medium
801
# credentials may be stripped from the base in some circumstances
802
# as yet to be clearly defined or documented, so copy them.
803
self._username = clone_from._username
804
# reuse same connection
805
self._medium = clone_from._medium
806
assert self._medium is not None
808
def abspath(self, relpath):
809
"""Return the full url to the given relative path.
811
@param relpath: the relative path or path components
812
@type relpath: str or list
814
return self._unparse_url(self._remote_path(relpath))
816
def clone(self, relative_url):
817
"""Make a new RemoteTransport related to me, sharing the same connection.
819
This essentially opens a handle on a different remote directory.
821
if relative_url is None:
822
return RemoteTransport(self.base, self)
824
return RemoteTransport(self.abspath(relative_url), self)
149
826
def is_readonly(self):
150
827
"""Smart server transport can do read/write file operations."""
152
resp = self._call2('Transport.is_readonly')
153
except errors.UnknownSmartMethod:
154
# XXX: nasty hack: servers before 0.16 don't have a
155
# 'Transport.is_readonly' verb, so we do what clients before 0.16
828
resp = self._call2('Transport.is_readonly')
158
829
if resp == ('yes', ):
160
831
elif resp == ('no', ):
163
raise errors.UnexpectedSmartServerResponse(resp)
834
self._translate_error(resp)
835
assert False, 'weird response %r' % (resp,)
165
837
def get_smart_client(self):
166
return self._get_connection()
168
840
def get_smart_medium(self):
169
return self._get_connection()
843
def _unparse_url(self, path):
844
"""Return URL for a path.
846
:see: SFTPUrlHandling._unparse_url
848
# TODO: Eventually it should be possible to unify this with
849
# SFTPUrlHandling._unparse_url?
852
path = urllib.quote(path)
853
netloc = urllib.quote(self._host)
854
if self._username is not None:
855
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
856
if self._port is not None:
857
netloc = '%s:%d' % (netloc, self._port)
858
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
171
860
def _remote_path(self, relpath):
172
861
"""Returns the Unicode version of the absolute path for relpath."""
309
970
if resp[0] == 'appended':
310
971
return int(resp[1])
311
raise errors.UnexpectedSmartServerResponse(resp)
972
self._translate_error(resp)
313
974
def delete(self, relpath):
314
975
resp = self._call2('delete', self._remote_path(relpath))
315
self._ensure_ok(resp)
317
def external_url(self):
318
"""See bzrlib.transport.Transport.external_url."""
319
# the external path for RemoteTransports is the base
322
def recommended_page_size(self):
323
"""Return the recommended page size for this transport."""
326
def _readv(self, relpath, offsets):
976
self._translate_error(resp)
978
def readv(self, relpath, offsets):
330
982
offsets = list(offsets)
332
984
sorted_offsets = sorted(offsets)
985
# turn the list of offsets into a stack
986
offset_stack = iter(offsets)
987
cur_offset_and_size = offset_stack.next()
333
988
coalesced = list(self._coalesce_offsets(sorted_offsets,
334
989
limit=self._max_readv_combine,
335
fudge_factor=self._bytes_to_read_before_seek,
336
max_size=self._max_readv_bytes))
338
# now that we've coallesced things, avoid making enormous requests
343
if c.length + cur_len > self._max_readv_bytes:
344
requests.append(cur_request)
348
cur_request.append(c)
351
requests.append(cur_request)
352
if 'hpss' in debug.debug_flags:
353
trace.mutter('%s.readv %s offsets => %s coalesced'
354
' => %s requests (%s)',
355
self.__class__.__name__, len(offsets), len(coalesced),
356
len(requests), sum(map(len, requests)))
990
fudge_factor=self._bytes_to_read_before_seek))
992
request = self._medium.get_request()
993
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
994
smart_protocol.call_with_body_readv_array(
995
('readv', self._remote_path(relpath)),
996
[(c.start, c.length) for c in coalesced])
997
resp = smart_protocol.read_response_tuple(True)
999
if resp[0] != 'readv':
1000
# This should raise an exception
1001
smart_protocol.cancel_read_body()
1002
self._translate_error(resp)
1005
# FIXME: this should know how many bytes are needed, for clarity.
1006
data = smart_protocol.read_body_bytes()
357
1007
# Cache the results, but only until they have been fulfilled
359
# turn the list of offsets into a single stack to iterate
360
offset_stack = iter(offsets)
361
# using a list so it can be modified when passing down and coming back
362
next_offset = [offset_stack.next()]
363
for cur_request in requests:
365
result = self._client.call_with_body_readv_array(
366
('readv', self._remote_path(relpath),),
367
[(c.start, c.length) for c in cur_request])
368
resp, response_handler = result
369
except errors.ErrorFromSmartServer, err:
370
self._translate_error(err, relpath)
372
if resp[0] != 'readv':
373
# This should raise an exception
374
response_handler.cancel_read_body()
375
raise errors.UnexpectedSmartServerResponse(resp)
377
for res in self._handle_response(offset_stack, cur_request,
383
def _handle_response(self, offset_stack, coalesced, response_handler,
384
data_map, next_offset):
385
cur_offset_and_size = next_offset[0]
386
# FIXME: this should know how many bytes are needed, for clarity.
387
data = response_handler.read_body_bytes()
389
1009
for c_offset in coalesced:
390
1010
if len(data) < c_offset.length:
391
1011
raise errors.ShortReadvError(relpath, c_offset.start,
392
1012
c_offset.length, actual=len(data))
393
1013
for suboffset, subsize in c_offset.ranges:
394
1014
key = (c_offset.start+suboffset, subsize)
395
this_data = data[data_offset+suboffset:
396
data_offset+suboffset+subsize]
397
# Special case when the data is in-order, rather than packing
398
# into a map and then back out again. Benchmarking shows that
399
# this has 100% hit rate, but leave in the data_map work just
401
# TODO: Could we get away with using buffer() to avoid the
402
# memory copy? Callers would need to realize they may
403
# not have a real string.
404
if key == cur_offset_and_size:
405
yield cur_offset_and_size[0], this_data
406
cur_offset_and_size = next_offset[0] = offset_stack.next()
408
data_map[key] = this_data
409
data_offset += c_offset.length
1015
data_map[key] = data[suboffset:suboffset+subsize]
1016
data = data[c_offset.length:]
411
1018
# Now that we've read some data, see if we can yield anything back
412
1019
while cur_offset_and_size in data_map:
413
1020
this_data = data_map.pop(cur_offset_and_size)
414
1021
yield cur_offset_and_size[0], this_data
415
cur_offset_and_size = next_offset[0] = offset_stack.next()
1022
cur_offset_and_size = offset_stack.next()
417
1024
def rename(self, rel_from, rel_to):
418
1025
self._call('rename',