/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Andrew Bennetts
  • Date: 2010-04-13 04:33:55 UTC
  • mfrom: (5147 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5149.
  • Revision ID: andrew.bennetts@canonical.com-20100413043355-lg3id0uwtju0k3zs
MergeĀ lp:bzr.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""The 'medium' layer for the smart servers and clients.
18
18
 
25
25
"""
26
26
 
27
27
import os
28
 
import socket
29
28
import sys
30
29
import urllib
31
30
 
32
31
from bzrlib.lazy_import import lazy_import
33
32
lazy_import(globals(), """
34
33
import atexit
 
34
import socket
 
35
import thread
35
36
import weakref
 
37
 
36
38
from bzrlib import (
37
39
    debug,
38
40
    errors,
39
 
    osutils,
40
41
    symbol_versioning,
41
42
    trace,
 
43
    ui,
42
44
    urlutils,
43
45
    )
44
 
from bzrlib.smart import client, protocol
 
46
from bzrlib.smart import client, protocol, request, vfs
45
47
from bzrlib.transport import ssh
46
48
""")
47
 
 
48
 
 
49
 
# We must not read any more than 64k at a time so we don't risk "no buffer
50
 
# space available" errors on some platforms.  Windows in particular is likely
51
 
# to give error 10053 or 10055 if we read more than 64k from a socket.
52
 
_MAX_READ_SIZE = 64 * 1024
53
 
 
 
49
from bzrlib import osutils
 
50
 
 
51
# Throughout this module buffer size parameters are either limited to be at
 
52
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
 
53
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
 
54
# from non-sockets as well.
 
55
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
54
56
 
55
57
def _get_protocol_factory_for_bytes(bytes):
56
58
    """Determine the right protocol factory for 'bytes'.
86
88
 
87
89
def _get_line(read_bytes_func):
88
90
    """Read bytes using read_bytes_func until a newline byte.
89
 
    
 
91
 
90
92
    This isn't particularly efficient, so should only be used when the
91
93
    expected size of the line is quite short.
92
 
    
 
94
 
93
95
    :returns: a tuple of two strs: (line, excess)
94
96
    """
95
97
    newline_pos = -1
111
113
 
112
114
    def __init__(self):
113
115
        self._push_back_buffer = None
114
 
        
 
116
 
115
117
    def _push_back(self, bytes):
116
118
        """Return unused bytes to the medium, because they belong to the next
117
119
        request(s).
151
153
 
152
154
    def _get_line(self):
153
155
        """Read bytes from this request's response until a newline byte.
154
 
        
 
156
 
155
157
        This isn't particularly efficient, so should only be used when the
156
158
        expected size of the line is quite short.
157
159
 
160
162
        line, excess = _get_line(self.read_bytes)
161
163
        self._push_back(excess)
162
164
        return line
163
 
 
 
165
 
 
166
    def _report_activity(self, bytes, direction):
 
167
        """Notify that this medium has activity.
 
168
 
 
169
        Implementations should call this from all methods that actually do IO.
 
170
        Be careful that it's not called twice, if one method is implemented on
 
171
        top of another.
 
172
 
 
173
        :param bytes: Number of bytes read or written.
 
174
        :param direction: 'read' or 'write' or None.
 
175
        """
 
176
        ui.ui_factory.report_transport_activity(self, bytes, direction)
 
177
 
164
178
 
165
179
class SmartServerStreamMedium(SmartMedium):
166
180
    """Handles smart commands coming over a stream.
171
185
    One instance is created for each connected client; it can serve multiple
172
186
    requests in the lifetime of the connection.
173
187
 
174
 
    The server passes requests through to an underlying backing transport, 
 
188
    The server passes requests through to an underlying backing transport,
175
189
    which will typically be a LocalTransport looking at the server's filesystem.
176
190
 
177
191
    :ivar _push_back_buffer: a str of bytes that have been read from the stream
222
236
 
223
237
    def _serve_one_request(self, protocol):
224
238
        """Read one request from input, process, send back a response.
225
 
        
 
239
 
226
240
        :param protocol: a SmartServerRequestProtocol.
227
241
        """
228
242
        try:
260
274
    def _serve_one_request_unguarded(self, protocol):
261
275
        while protocol.next_read_size():
262
276
            # We can safely try to read large chunks.  If there is less data
263
 
            # than _MAX_READ_SIZE ready, the socket wil just return a short
264
 
            # read immediately rather than block.
265
 
            bytes = self.read_bytes(_MAX_READ_SIZE)
 
277
            # than MAX_SOCKET_CHUNK ready, the socket will just return a
 
278
            # short read immediately rather than block.
 
279
            bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
266
280
            if bytes == '':
267
281
                self.finished = True
268
282
                return
269
283
            protocol.accept_bytes(bytes)
270
 
        
 
284
 
271
285
        self._push_back(protocol.unused_data)
272
286
 
273
287
    def _read_bytes(self, desired_count):
274
 
        # We ignore the desired_count because on sockets it's more efficient to
275
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
276
 
        return self.socket.recv(_MAX_READ_SIZE)
 
288
        return osutils.read_bytes_from_socket(
 
289
            self.socket, self._report_activity)
277
290
 
278
291
    def terminate_due_to_error(self):
279
292
        # TODO: This should log to a server log file, but no such thing
282
295
        self.finished = True
283
296
 
284
297
    def _write_out(self, bytes):
285
 
        osutils.send_all(self.socket, bytes)
 
298
        tstart = osutils.timer_func()
 
299
        osutils.send_all(self.socket, bytes, self._report_activity)
 
300
        if 'hpss' in debug.debug_flags:
 
301
            thread_id = thread.get_ident()
 
302
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
 
303
                         % ('wrote', thread_id, len(bytes),
 
304
                            osutils.timer_func() - tstart))
286
305
 
287
306
 
288
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
349
368
    request.finished_reading()
350
369
 
351
370
    It is up to the individual SmartClientMedium whether multiple concurrent
352
 
    requests can exist. See SmartClientMedium.get_request to obtain instances 
353
 
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
371
    requests can exist. See SmartClientMedium.get_request to obtain instances
 
372
    of SmartClientMediumRequest, and the concrete Medium you are using for
354
373
    details on concurrency and pipelining.
355
374
    """
356
375
 
365
384
    def accept_bytes(self, bytes):
366
385
        """Accept bytes for inclusion in this request.
367
386
 
368
 
        This method may not be be called after finished_writing() has been
 
387
        This method may not be called after finished_writing() has been
369
388
        called.  It depends upon the Medium whether or not the bytes will be
370
389
        immediately transmitted. Message based Mediums will tend to buffer the
371
390
        bytes until finished_writing() is called.
402
421
    def _finished_reading(self):
403
422
        """Helper for finished_reading.
404
423
 
405
 
        finished_reading checks the state of the request to determine if 
 
424
        finished_reading checks the state of the request to determine if
406
425
        finished_reading is allowed, and if it is hands off to _finished_reading
407
426
        to perform the action.
408
427
        """
422
441
    def _finished_writing(self):
423
442
        """Helper for finished_writing.
424
443
 
425
 
        finished_writing checks the state of the request to determine if 
 
444
        finished_writing checks the state of the request to determine if
426
445
        finished_writing is allowed, and if it is hands off to _finished_writing
427
446
        to perform the action.
428
447
        """
448
467
        read_bytes checks the state of the request to determing if bytes
449
468
        should be read. After that it hands off to _read_bytes to do the
450
469
        actual read.
451
 
        
 
470
 
452
471
        By default this forwards to self._medium.read_bytes because we are
453
472
        operating on the medium's stream.
454
473
        """
459
478
        if not line.endswith('\n'):
460
479
            # end of file encountered reading from server
461
480
            raise errors.ConnectionReset(
462
 
                "please check connectivity and permissions",
463
 
                "(and try -Dhpss if further diagnosis is required)")
 
481
                "Unexpected end of message. Please check connectivity "
 
482
                "and permissions, and report a bug if problems persist.")
464
483
        return line
465
484
 
466
485
    def _read_line(self):
467
486
        """Helper for SmartClientMediumRequest.read_line.
468
 
        
 
487
 
469
488
        By default this forwards to self._medium._get_line because we are
470
489
        operating on the medium's stream.
471
490
        """
494
513
        """
495
514
        medium_repr = repr(medium)
496
515
        # Add this medium to the WeakKeyDictionary
497
 
        self.counts[medium] = [0, medium_repr]
 
516
        self.counts[medium] = dict(count=0, vfs_count=0,
 
517
                                   medium_repr=medium_repr)
498
518
        # Weakref callbacks are fired in reverse order of their association
499
519
        # with the referenced object.  So we add a weakref *after* adding to
500
520
        # the WeakKeyDict so that we can report the value from it before the
504
524
    def increment_call_count(self, params):
505
525
        # Increment the count in the WeakKeyDictionary
506
526
        value = self.counts[params.medium]
507
 
        value[0] += 1
 
527
        value['count'] += 1
 
528
        try:
 
529
            request_method = request.request_handlers.get(params.method)
 
530
        except KeyError:
 
531
            # A method we don't know about doesn't count as a VFS method.
 
532
            return
 
533
        if issubclass(request_method, vfs.VfsRequest):
 
534
            value['vfs_count'] += 1
508
535
 
509
536
    def done(self, ref):
510
537
        value = self.counts[ref]
511
 
        count, medium_repr = value
 
538
        count, vfs_count, medium_repr = (
 
539
            value['count'], value['vfs_count'], value['medium_repr'])
512
540
        # In case this callback is invoked for the same ref twice (by the
513
541
        # weakref callback and by the atexit function), set the call count back
514
542
        # to 0 so this item won't be reported twice.
515
 
        value[0] = 0
 
543
        value['count'] = 0
 
544
        value['vfs_count'] = 0
516
545
        if count != 0:
517
 
            trace.note('HPSS calls: %d %s', count, medium_repr)
518
 
        
 
546
            trace.note('HPSS calls: %d (%d vfs) %s',
 
547
                       count, vfs_count, medium_repr)
 
548
 
519
549
    def flush_all(self):
520
550
        for ref in list(self.counts.keys()):
521
551
            self.done(ref)
522
552
 
523
553
_debug_counter = None
524
 
  
525
 
  
 
554
 
 
555
 
526
556
class SmartClientMedium(SmartMedium):
527
557
    """Smart client is a medium for sending smart protocol requests over."""
528
558
 
573
603
        """
574
604
        if (self._remote_version_is_before is not None and
575
605
            version_tuple > self._remote_version_is_before):
 
606
            # We have been told that the remote side is older than some version
 
607
            # which is newer than a previously supplied older-than version.
 
608
            # This indicates that some smart verb call is not guarded
 
609
            # appropriately (it should simply not have been tried).
576
610
            raise AssertionError(
577
611
                "_remember_remote_is_before(%r) called, but "
578
612
                "_remember_remote_is_before(%r) was called previously."
616
650
 
617
651
    def disconnect(self):
618
652
        """If this medium maintains a persistent connection, close it.
619
 
        
 
653
 
620
654
        The default implementation does nothing.
621
655
        """
622
 
        
 
656
 
623
657
    def remote_path_from_transport(self, transport):
624
658
        """Convert transport into a path suitable for using in a request.
625
 
        
 
659
 
626
660
        Note that the resulting remote path doesn't encode the host name or
627
661
        anything but path, so it is only safe to use it in requests sent over
628
662
        the medium from the matching transport.
656
690
 
657
691
    def _flush(self):
658
692
        """Flush the output stream.
659
 
        
 
693
 
660
694
        This method is used by the SmartClientStreamMediumRequest to ensure that
661
695
        all data for a request is sent, to avoid long timeouts or deadlocks.
662
696
        """
673
707
 
674
708
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
675
709
    """A client medium using simple pipes.
676
 
    
 
710
 
677
711
    This client does not manage the pipes: it assumes they will always be open.
 
712
 
 
713
    Note that if readable_pipe.read might raise IOError or OSError with errno
 
714
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
 
715
    (such as used for sys.stdin) are safe.
678
716
    """
679
717
 
680
718
    def __init__(self, readable_pipe, writeable_pipe, base):
685
723
    def _accept_bytes(self, bytes):
686
724
        """See SmartClientStreamMedium.accept_bytes."""
687
725
        self._writeable_pipe.write(bytes)
 
726
        self._report_activity(len(bytes), 'write')
688
727
 
689
728
    def _flush(self):
690
729
        """See SmartClientStreamMedium._flush()."""
692
731
 
693
732
    def _read_bytes(self, count):
694
733
        """See SmartClientStreamMedium._read_bytes."""
695
 
        return self._readable_pipe.read(count)
 
734
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
735
        self._report_activity(len(bytes), 'read')
 
736
        return bytes
696
737
 
697
738
 
698
739
class SmartSSHClientMedium(SmartClientStreamMedium):
699
740
    """A client medium using SSH."""
700
 
    
 
741
 
701
742
    def __init__(self, host, port=None, username=None, password=None,
702
743
            base=None, vendor=None, bzr_remote_path=None):
703
744
        """Creates a client that will connect on the first use.
704
 
        
 
745
 
705
746
        :param vendor: An optional override for the ssh vendor to use. See
706
747
            bzrlib.transport.ssh for details on ssh vendors.
707
748
        """
708
 
        SmartClientStreamMedium.__init__(self, base)
709
749
        self._connected = False
710
750
        self._host = host
711
751
        self._password = password
712
752
        self._port = port
713
753
        self._username = username
 
754
        # for the benefit of progress making a short description of this
 
755
        # transport
 
756
        self._scheme = 'bzr+ssh'
 
757
        # SmartClientStreamMedium stores the repr of this object in its
 
758
        # _DebugCounter so we have to store all the values used in our repr
 
759
        # method before calling the super init.
 
760
        SmartClientStreamMedium.__init__(self, base)
714
761
        self._read_from = None
715
762
        self._ssh_connection = None
716
763
        self._vendor = vendor
717
764
        self._write_to = None
718
765
        self._bzr_remote_path = bzr_remote_path
719
 
        if self._bzr_remote_path is None:
720
 
            symbol_versioning.warn(
721
 
                'bzr_remote_path is required as of bzr 0.92',
722
 
                DeprecationWarning, stacklevel=2)
723
 
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
766
 
 
767
    def __repr__(self):
 
768
        if self._port is None:
 
769
            maybe_port = ''
 
770
        else:
 
771
            maybe_port = ':%s' % self._port
 
772
        return "%s(%s://%s@%s%s/)" % (
 
773
            self.__class__.__name__,
 
774
            self._scheme,
 
775
            self._username,
 
776
            self._host,
 
777
            maybe_port)
724
778
 
725
779
    def _accept_bytes(self, bytes):
726
780
        """See SmartClientStreamMedium.accept_bytes."""
727
781
        self._ensure_connection()
728
782
        self._write_to.write(bytes)
 
783
        self._report_activity(len(bytes), 'write')
729
784
 
730
785
    def disconnect(self):
731
786
        """See SmartClientMedium.disconnect()."""
761
816
        if not self._connected:
762
817
            raise errors.MediumNotConnected(self)
763
818
        bytes_to_read = min(count, _MAX_READ_SIZE)
764
 
        return self._read_from.read(bytes_to_read)
 
819
        bytes = self._read_from.read(bytes_to_read)
 
820
        self._report_activity(len(bytes), 'read')
 
821
        return bytes
765
822
 
766
823
 
767
824
# Port 4155 is the default port for bzr://, registered with IANA.
771
828
 
772
829
class SmartTCPClientMedium(SmartClientStreamMedium):
773
830
    """A client medium using TCP."""
774
 
    
 
831
 
775
832
    def __init__(self, host, port, base):
776
833
        """Creates a client that will connect on the first use."""
777
834
        SmartClientStreamMedium.__init__(self, base)
783
840
    def _accept_bytes(self, bytes):
784
841
        """See SmartClientMedium.accept_bytes."""
785
842
        self._ensure_connection()
786
 
        osutils.send_all(self._socket, bytes)
 
843
        osutils.send_all(self._socket, bytes, self._report_activity)
787
844
 
788
845
    def disconnect(self):
789
846
        """See SmartClientMedium.disconnect()."""
802
859
        else:
803
860
            port = int(self._port)
804
861
        try:
805
 
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC, 
 
862
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
806
863
                socket.SOCK_STREAM, 0, 0)
807
864
        except socket.gaierror, (err_num, err_msg):
808
865
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
812
869
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
813
870
            try:
814
871
                self._socket = socket.socket(family, socktype, proto)
815
 
                self._socket.setsockopt(socket.IPPROTO_TCP, 
 
872
                self._socket.setsockopt(socket.IPPROTO_TCP,
816
873
                                        socket.TCP_NODELAY, 1)
817
874
                self._socket.connect(sockaddr)
818
875
            except socket.error, err:
834
891
 
835
892
    def _flush(self):
836
893
        """See SmartClientStreamMedium._flush().
837
 
        
838
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
894
 
 
895
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
839
896
        add a means to do a flush, but that can be done in the future.
840
897
        """
841
898
 
843
900
        """See SmartClientMedium.read_bytes."""
844
901
        if not self._connected:
845
902
            raise errors.MediumNotConnected(self)
846
 
        # We ignore the desired_count because on sockets it's more efficient to
847
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
848
 
        return self._socket.recv(_MAX_READ_SIZE)
 
903
        return osutils.read_bytes_from_socket(
 
904
            self._socket, self._report_activity)
849
905
 
850
906
 
851
907
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
864
920
 
865
921
    def _accept_bytes(self, bytes):
866
922
        """See SmartClientMediumRequest._accept_bytes.
867
 
        
 
923
 
868
924
        This forwards to self._medium._accept_bytes because we are operating
869
925
        on the mediums stream.
870
926
        """
873
929
    def _finished_reading(self):
874
930
        """See SmartClientMediumRequest._finished_reading.
875
931
 
876
 
        This clears the _current_request on self._medium to allow a new 
 
932
        This clears the _current_request on self._medium to allow a new
877
933
        request to be created.
878
934
        """
879
935
        if self._medium._current_request is not self:
880
936
            raise AssertionError()
881
937
        self._medium._current_request = None
882
 
        
 
938
 
883
939
    def _finished_writing(self):
884
940
        """See SmartClientMediumRequest._finished_writing.
885
941
 
887
943
        """
888
944
        self._medium._flush()
889
945
 
 
946