13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""The 'medium' layer for the smart servers and clients.
32
31
from bzrlib.lazy_import import lazy_import
33
32
lazy_import(globals(), """
36
38
from bzrlib import (
44
from bzrlib.smart import client, protocol
46
from bzrlib.smart import client, protocol, request, vfs
45
47
from bzrlib.transport import ssh
49
# We must not read any more than 64k at a time so we don't risk "no buffer
50
# space available" errors on some platforms. Windows in particular is likely
51
# to give error 10053 or 10055 if we read more than 64k from a socket.
52
_MAX_READ_SIZE = 64 * 1024
49
from bzrlib import osutils
51
# Throughout this module buffer size parameters are either limited to be at
52
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
53
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
54
# from non-sockets as well.
55
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
55
57
def _get_protocol_factory_for_bytes(bytes):
56
58
"""Determine the right protocol factory for 'bytes'.
87
89
def _get_line(read_bytes_func):
88
90
"""Read bytes using read_bytes_func until a newline byte.
90
92
This isn't particularly efficient, so should only be used when the
91
93
expected size of the line is quite short.
93
95
:returns: a tuple of two strs: (line, excess)
160
162
line, excess = _get_line(self.read_bytes)
161
163
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
165
179
class SmartServerStreamMedium(SmartMedium):
166
180
"""Handles smart commands coming over a stream.
171
185
One instance is created for each connected client; it can serve multiple
172
186
requests in the lifetime of the connection.
174
The server passes requests through to an underlying backing transport,
188
The server passes requests through to an underlying backing transport,
175
189
which will typically be a LocalTransport looking at the server's filesystem.
177
191
:ivar _push_back_buffer: a str of bytes that have been read from the stream
260
274
def _serve_one_request_unguarded(self, protocol):
261
275
while protocol.next_read_size():
262
276
# We can safely try to read large chunks. If there is less data
263
# than _MAX_READ_SIZE ready, the socket wil just return a short
264
# read immediately rather than block.
265
bytes = self.read_bytes(_MAX_READ_SIZE)
277
# than MAX_SOCKET_CHUNK ready, the socket will just return a
278
# short read immediately rather than block.
279
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
267
281
self.finished = True
269
283
protocol.accept_bytes(bytes)
271
285
self._push_back(protocol.unused_data)
273
287
def _read_bytes(self, desired_count):
274
# We ignore the desired_count because on sockets it's more efficient to
275
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
276
return self.socket.recv(_MAX_READ_SIZE)
288
return osutils.read_bytes_from_socket(
289
self.socket, self._report_activity)
278
291
def terminate_due_to_error(self):
279
292
# TODO: This should log to a server log file, but no such thing
282
295
self.finished = True
284
297
def _write_out(self, bytes):
285
osutils.send_all(self.socket, bytes)
298
tstart = osutils.timer_func()
299
osutils.send_all(self.socket, bytes, self._report_activity)
300
if 'hpss' in debug.debug_flags:
301
thread_id = thread.get_ident()
302
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
303
% ('wrote', thread_id, len(bytes),
304
osutils.timer_func() - tstart))
288
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
349
368
request.finished_reading()
351
370
It is up to the individual SmartClientMedium whether multiple concurrent
352
requests can exist. See SmartClientMedium.get_request to obtain instances
353
of SmartClientMediumRequest, and the concrete Medium you are using for
371
requests can exist. See SmartClientMedium.get_request to obtain instances
372
of SmartClientMediumRequest, and the concrete Medium you are using for
354
373
details on concurrency and pipelining.
365
384
def accept_bytes(self, bytes):
366
385
"""Accept bytes for inclusion in this request.
368
This method may not be be called after finished_writing() has been
387
This method may not be called after finished_writing() has been
369
388
called. It depends upon the Medium whether or not the bytes will be
370
389
immediately transmitted. Message based Mediums will tend to buffer the
371
390
bytes until finished_writing() is called.
402
421
def _finished_reading(self):
403
422
"""Helper for finished_reading.
405
finished_reading checks the state of the request to determine if
424
finished_reading checks the state of the request to determine if
406
425
finished_reading is allowed, and if it is hands off to _finished_reading
407
426
to perform the action.
422
441
def _finished_writing(self):
423
442
"""Helper for finished_writing.
425
finished_writing checks the state of the request to determine if
444
finished_writing checks the state of the request to determine if
426
445
finished_writing is allowed, and if it is hands off to _finished_writing
427
446
to perform the action.
459
478
if not line.endswith('\n'):
460
479
# end of file encountered reading from server
461
480
raise errors.ConnectionReset(
462
"please check connectivity and permissions",
463
"(and try -Dhpss if further diagnosis is required)")
481
"Unexpected end of message. Please check connectivity "
482
"and permissions, and report a bug if problems persist.")
466
485
def _read_line(self):
467
486
"""Helper for SmartClientMediumRequest.read_line.
469
488
By default this forwards to self._medium._get_line because we are
470
489
operating on the medium's stream.
495
514
medium_repr = repr(medium)
496
515
# Add this medium to the WeakKeyDictionary
497
self.counts[medium] = [0, medium_repr]
516
self.counts[medium] = dict(count=0, vfs_count=0,
517
medium_repr=medium_repr)
498
518
# Weakref callbacks are fired in reverse order of their association
499
519
# with the referenced object. So we add a weakref *after* adding to
500
520
# the WeakKeyDict so that we can report the value from it before the
504
524
def increment_call_count(self, params):
505
525
# Increment the count in the WeakKeyDictionary
506
526
value = self.counts[params.medium]
529
request_method = request.request_handlers.get(params.method)
531
# A method we don't know about doesn't count as a VFS method.
533
if issubclass(request_method, vfs.VfsRequest):
534
value['vfs_count'] += 1
509
536
def done(self, ref):
510
537
value = self.counts[ref]
511
count, medium_repr = value
538
count, vfs_count, medium_repr = (
539
value['count'], value['vfs_count'], value['medium_repr'])
512
540
# In case this callback is invoked for the same ref twice (by the
513
541
# weakref callback and by the atexit function), set the call count back
514
542
# to 0 so this item won't be reported twice.
544
value['vfs_count'] = 0
517
trace.note('HPSS calls: %d %s', count, medium_repr)
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
519
549
def flush_all(self):
520
550
for ref in list(self.counts.keys()):
523
553
_debug_counter = None
526
556
class SmartClientMedium(SmartMedium):
527
557
"""Smart client is a medium for sending smart protocol requests over."""
574
604
if (self._remote_version_is_before is not None and
575
605
version_tuple > self._remote_version_is_before):
606
# We have been told that the remote side is older than some version
607
# which is newer than a previously supplied older-than version.
608
# This indicates that some smart verb call is not guarded
609
# appropriately (it should simply not have been tried).
576
610
raise AssertionError(
577
611
"_remember_remote_is_before(%r) called, but "
578
612
"_remember_remote_is_before(%r) was called previously."
617
651
def disconnect(self):
618
652
"""If this medium maintains a persistent connection, close it.
620
654
The default implementation does nothing.
623
657
def remote_path_from_transport(self, transport):
624
658
"""Convert transport into a path suitable for using in a request.
626
660
Note that the resulting remote path doesn't encode the host name or
627
661
anything but path, so it is only safe to use it in requests sent over
628
662
the medium from the matching transport.
674
708
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
675
709
"""A client medium using simple pipes.
677
711
This client does not manage the pipes: it assumes they will always be open.
713
Note that if readable_pipe.read might raise IOError or OSError with errno
714
of EINTR, it must be safe to retry the read. Plain CPython fileobjects
715
(such as used for sys.stdin) are safe.
680
718
def __init__(self, readable_pipe, writeable_pipe, base):
685
723
def _accept_bytes(self, bytes):
686
724
"""See SmartClientStreamMedium.accept_bytes."""
687
725
self._writeable_pipe.write(bytes)
726
self._report_activity(len(bytes), 'write')
689
728
def _flush(self):
690
729
"""See SmartClientStreamMedium._flush()."""
693
732
def _read_bytes(self, count):
694
733
"""See SmartClientStreamMedium._read_bytes."""
695
return self._readable_pipe.read(count)
734
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
735
self._report_activity(len(bytes), 'read')
698
739
class SmartSSHClientMedium(SmartClientStreamMedium):
699
740
"""A client medium using SSH."""
701
742
def __init__(self, host, port=None, username=None, password=None,
702
743
base=None, vendor=None, bzr_remote_path=None):
703
744
"""Creates a client that will connect on the first use.
705
746
:param vendor: An optional override for the ssh vendor to use. See
706
747
bzrlib.transport.ssh for details on ssh vendors.
708
SmartClientStreamMedium.__init__(self, base)
709
749
self._connected = False
710
750
self._host = host
711
751
self._password = password
712
752
self._port = port
713
753
self._username = username
754
# for the benefit of progress making a short description of this
756
self._scheme = 'bzr+ssh'
757
# SmartClientStreamMedium stores the repr of this object in its
758
# _DebugCounter so we have to store all the values used in our repr
759
# method before calling the super init.
760
SmartClientStreamMedium.__init__(self, base)
714
761
self._read_from = None
715
762
self._ssh_connection = None
716
763
self._vendor = vendor
717
764
self._write_to = None
718
765
self._bzr_remote_path = bzr_remote_path
719
if self._bzr_remote_path is None:
720
symbol_versioning.warn(
721
'bzr_remote_path is required as of bzr 0.92',
722
DeprecationWarning, stacklevel=2)
723
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
768
if self._port is None:
771
maybe_port = ':%s' % self._port
772
return "%s(%s://%s@%s%s/)" % (
773
self.__class__.__name__,
725
779
def _accept_bytes(self, bytes):
726
780
"""See SmartClientStreamMedium.accept_bytes."""
727
781
self._ensure_connection()
728
782
self._write_to.write(bytes)
783
self._report_activity(len(bytes), 'write')
730
785
def disconnect(self):
731
786
"""See SmartClientMedium.disconnect()."""
761
816
if not self._connected:
762
817
raise errors.MediumNotConnected(self)
763
818
bytes_to_read = min(count, _MAX_READ_SIZE)
764
return self._read_from.read(bytes_to_read)
819
bytes = self._read_from.read(bytes_to_read)
820
self._report_activity(len(bytes), 'read')
767
824
# Port 4155 is the default port for bzr://, registered with IANA.
772
829
class SmartTCPClientMedium(SmartClientStreamMedium):
773
830
"""A client medium using TCP."""
775
832
def __init__(self, host, port, base):
776
833
"""Creates a client that will connect on the first use."""
777
834
SmartClientStreamMedium.__init__(self, base)
783
840
def _accept_bytes(self, bytes):
784
841
"""See SmartClientMedium.accept_bytes."""
785
842
self._ensure_connection()
786
osutils.send_all(self._socket, bytes)
843
osutils.send_all(self._socket, bytes, self._report_activity)
788
845
def disconnect(self):
789
846
"""See SmartClientMedium.disconnect()."""
803
860
port = int(self._port)
805
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
862
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
806
863
socket.SOCK_STREAM, 0, 0)
807
864
except socket.gaierror, (err_num, err_msg):
808
865
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
812
869
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
814
871
self._socket = socket.socket(family, socktype, proto)
815
self._socket.setsockopt(socket.IPPROTO_TCP,
872
self._socket.setsockopt(socket.IPPROTO_TCP,
816
873
socket.TCP_NODELAY, 1)
817
874
self._socket.connect(sockaddr)
818
875
except socket.error, err:
843
900
"""See SmartClientMedium.read_bytes."""
844
901
if not self._connected:
845
902
raise errors.MediumNotConnected(self)
846
# We ignore the desired_count because on sockets it's more efficient to
847
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
848
return self._socket.recv(_MAX_READ_SIZE)
903
return osutils.read_bytes_from_socket(
904
self._socket, self._report_activity)
851
907
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
873
929
def _finished_reading(self):
874
930
"""See SmartClientMediumRequest._finished_reading.
876
This clears the _current_request on self._medium to allow a new
932
This clears the _current_request on self._medium to allow a new
877
933
request to be created.
879
935
if self._medium._current_request is not self:
880
936
raise AssertionError()
881
937
self._medium._current_request = None
883
939
def _finished_writing(self):
884
940
"""See SmartClientMediumRequest._finished_writing.