/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Robert Collins
  • Date: 2010-05-06 11:08:10 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100506110810-h3j07fh5gmw54s25
Cleaner matcher matching revised unlocking protocol.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
import sys
29
29
import urllib
30
30
 
31
 
import bzrlib
32
31
from bzrlib.lazy_import import lazy_import
33
32
lazy_import(globals(), """
 
33
import atexit
34
34
import socket
35
35
import thread
36
36
import weakref
38
38
from bzrlib import (
39
39
    debug,
40
40
    errors,
 
41
    symbol_versioning,
41
42
    trace,
42
43
    ui,
43
44
    urlutils,
490
491
        return self._medium._get_line()
491
492
 
492
493
 
493
 
class _VfsRefuser(object):
494
 
    """An object that refuses all VFS requests.
495
 
 
496
 
    """
497
 
 
498
 
    def __init__(self):
499
 
        client._SmartClient.hooks.install_named_hook(
500
 
            'call', self.check_vfs, 'vfs refuser')
501
 
 
502
 
    def check_vfs(self, params):
503
 
        try:
504
 
            request_method = request.request_handlers.get(params.method)
505
 
        except KeyError:
506
 
            # A method we don't know about doesn't count as a VFS method.
507
 
            return
508
 
        if issubclass(request_method, vfs.VfsRequest):
509
 
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
510
 
 
511
 
 
512
494
class _DebugCounter(object):
513
495
    """An object that counts the HPSS calls made to each client medium.
514
496
 
515
 
    When a medium is garbage-collected, or failing that when
516
 
    bzrlib.global_state exits, the total number of calls made on that medium
517
 
    are reported via trace.note.
 
497
    When a medium is garbage-collected, or failing that when atexit functions
 
498
    are run, the total number of calls made on that medium are reported via
 
499
    trace.note.
518
500
    """
519
501
 
520
502
    def __init__(self):
521
503
        self.counts = weakref.WeakKeyDictionary()
522
504
        client._SmartClient.hooks.install_named_hook(
523
505
            'call', self.increment_call_count, 'hpss call counter')
524
 
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
 
506
        atexit.register(self.flush_all)
525
507
 
526
508
    def track(self, medium):
527
509
        """Start tracking calls made to a medium.
569
551
            self.done(ref)
570
552
 
571
553
_debug_counter = None
572
 
_vfs_refuser = None
573
554
 
574
555
 
575
556
class SmartClientMedium(SmartMedium):
592
573
            if _debug_counter is None:
593
574
                _debug_counter = _DebugCounter()
594
575
            _debug_counter.track(self)
595
 
        if 'hpss_client_no_vfs' in debug.debug_flags:
596
 
            global _vfs_refuser
597
 
            if _vfs_refuser is None:
598
 
                _vfs_refuser = _VfsRefuser()
599
576
 
600
577
    def _is_remote_before(self, version_tuple):
601
578
        """Is it possible the remote side supports RPCs for a given version?
738
715
    """A client medium using simple pipes.
739
716
 
740
717
    This client does not manage the pipes: it assumes they will always be open.
 
718
 
 
719
    Note that if readable_pipe.read might raise IOError or OSError with errno
 
720
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
 
721
    (such as used for sys.stdin) are safe.
741
722
    """
742
723
 
743
724
    def __init__(self, readable_pipe, writeable_pipe, base):
756
737
 
757
738
    def _read_bytes(self, count):
758
739
        """See SmartClientStreamMedium._read_bytes."""
759
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
760
 
        bytes = self._readable_pipe.read(bytes_to_read)
 
740
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
761
741
        self._report_activity(len(bytes), 'read')
762
742
        return bytes
763
743
 
764
744
 
765
 
class SSHParams(object):
766
 
    """A set of parameters for starting a remote bzr via SSH."""
 
745
class SmartSSHClientMedium(SmartClientStreamMedium):
 
746
    """A client medium using SSH."""
767
747
 
768
748
    def __init__(self, host, port=None, username=None, password=None,
769
 
            bzr_remote_path='bzr'):
770
 
        self.host = host
771
 
        self.port = port
772
 
        self.username = username
773
 
        self.password = password
774
 
        self.bzr_remote_path = bzr_remote_path
775
 
 
776
 
 
777
 
class SmartSSHClientMedium(SmartClientStreamMedium):
778
 
    """A client medium using SSH.
779
 
    
780
 
    It delegates IO to a SmartClientSocketMedium or
781
 
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
782
 
    """
783
 
 
784
 
    def __init__(self, base, ssh_params, vendor=None):
 
749
            base=None, vendor=None, bzr_remote_path=None):
785
750
        """Creates a client that will connect on the first use.
786
751
 
787
 
        :param ssh_params: A SSHParams instance.
788
752
        :param vendor: An optional override for the ssh vendor to use. See
789
753
            bzrlib.transport.ssh for details on ssh vendors.
790
754
        """
791
 
        self._real_medium = None
792
 
        self._ssh_params = ssh_params
 
755
        self._connected = False
 
756
        self._host = host
 
757
        self._password = password
 
758
        self._port = port
 
759
        self._username = username
793
760
        # for the benefit of progress making a short description of this
794
761
        # transport
795
762
        self._scheme = 'bzr+ssh'
797
764
        # _DebugCounter so we have to store all the values used in our repr
798
765
        # method before calling the super init.
799
766
        SmartClientStreamMedium.__init__(self, base)
 
767
        self._read_from = None
 
768
        self._ssh_connection = None
800
769
        self._vendor = vendor
801
 
        self._ssh_connection = None
 
770
        self._write_to = None
 
771
        self._bzr_remote_path = bzr_remote_path
802
772
 
803
773
    def __repr__(self):
804
 
        if self._ssh_params.port is None:
 
774
        if self._port is None:
805
775
            maybe_port = ''
806
776
        else:
807
 
            maybe_port = ':%s' % self._ssh_params.port
 
777
            maybe_port = ':%s' % self._port
808
778
        return "%s(%s://%s@%s%s/)" % (
809
779
            self.__class__.__name__,
810
780
            self._scheme,
811
 
            self._ssh_params.username,
812
 
            self._ssh_params.host,
 
781
            self._username,
 
782
            self._host,
813
783
            maybe_port)
814
784
 
815
785
    def _accept_bytes(self, bytes):
816
786
        """See SmartClientStreamMedium.accept_bytes."""
817
787
        self._ensure_connection()
818
 
        self._real_medium.accept_bytes(bytes)
 
788
        self._write_to.write(bytes)
 
789
        self._report_activity(len(bytes), 'write')
819
790
 
820
791
    def disconnect(self):
821
792
        """See SmartClientMedium.disconnect()."""
822
 
        if self._real_medium is not None:
823
 
            self._real_medium.disconnect()
824
 
            self._real_medium = None
825
 
        if self._ssh_connection is not None:
826
 
            self._ssh_connection.close()
827
 
            self._ssh_connection = None
 
793
        if not self._connected:
 
794
            return
 
795
        self._read_from.close()
 
796
        self._write_to.close()
 
797
        self._ssh_connection.close()
 
798
        self._connected = False
828
799
 
829
800
    def _ensure_connection(self):
830
801
        """Connect this medium if not already connected."""
831
 
        if self._real_medium is not None:
 
802
        if self._connected:
832
803
            return
833
804
        if self._vendor is None:
834
805
            vendor = ssh._get_ssh_vendor()
835
806
        else:
836
807
            vendor = self._vendor
837
 
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
838
 
                self._ssh_params.password, self._ssh_params.host,
839
 
                self._ssh_params.port,
840
 
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
808
        self._ssh_connection = vendor.connect_ssh(self._username,
 
809
                self._password, self._host, self._port,
 
810
                command=[self._bzr_remote_path, 'serve', '--inet',
841
811
                         '--directory=/', '--allow-writes'])
842
 
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
843
 
        if io_kind == 'socket':
844
 
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
845
 
                self.base, io_object)
846
 
        elif io_kind == 'pipes':
847
 
            read_from, write_to = io_object
848
 
            self._real_medium = SmartSimplePipesClientMedium(
849
 
                read_from, write_to, self.base)
850
 
        else:
851
 
            raise AssertionError(
852
 
                "Unexpected io_kind %r from %r"
853
 
                % (io_kind, self._ssh_connection))
 
812
        self._read_from, self._write_to = \
 
813
            self._ssh_connection.get_filelike_channels()
 
814
        self._connected = True
854
815
 
855
816
    def _flush(self):
856
817
        """See SmartClientStreamMedium._flush()."""
857
 
        self._real_medium._flush()
 
818
        self._write_to.flush()
858
819
 
859
820
    def _read_bytes(self, count):
860
821
        """See SmartClientStreamMedium.read_bytes."""
861
 
        if self._real_medium is None:
 
822
        if not self._connected:
862
823
            raise errors.MediumNotConnected(self)
863
 
        return self._real_medium.read_bytes(count)
 
824
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
825
        bytes = self._read_from.read(bytes_to_read)
 
826
        self._report_activity(len(bytes), 'read')
 
827
        return bytes
864
828
 
865
829
 
866
830
# Port 4155 is the default port for bzr://, registered with IANA.
868
832
BZR_DEFAULT_PORT = 4155
869
833
 
870
834
 
871
 
class SmartClientSocketMedium(SmartClientStreamMedium):
872
 
    """A client medium using a socket.
873
 
    
874
 
    This class isn't usable directly.  Use one of its subclasses instead.
875
 
    """
 
835
class SmartTCPClientMedium(SmartClientStreamMedium):
 
836
    """A client medium using TCP."""
876
837
 
877
 
    def __init__(self, base):
 
838
    def __init__(self, host, port, base):
 
839
        """Creates a client that will connect on the first use."""
878
840
        SmartClientStreamMedium.__init__(self, base)
 
841
        self._connected = False
 
842
        self._host = host
 
843
        self._port = port
879
844
        self._socket = None
880
 
        self._connected = False
881
845
 
882
846
    def _accept_bytes(self, bytes):
883
847
        """See SmartClientMedium.accept_bytes."""
884
848
        self._ensure_connection()
885
849
        osutils.send_all(self._socket, bytes, self._report_activity)
886
850
 
887
 
    def _ensure_connection(self):
888
 
        """Connect this medium if not already connected."""
889
 
        raise NotImplementedError(self._ensure_connection)
890
 
 
891
 
    def _flush(self):
892
 
        """See SmartClientStreamMedium._flush().
893
 
 
894
 
        For sockets we do no flushing. For TCP sockets we may want to turn off
895
 
        TCP_NODELAY and add a means to do a flush, but that can be done in the
896
 
        future.
897
 
        """
898
 
 
899
 
    def _read_bytes(self, count):
900
 
        """See SmartClientMedium.read_bytes."""
901
 
        if not self._connected:
902
 
            raise errors.MediumNotConnected(self)
903
 
        return osutils.read_bytes_from_socket(
904
 
            self._socket, self._report_activity)
905
 
 
906
851
    def disconnect(self):
907
852
        """See SmartClientMedium.disconnect()."""
908
853
        if not self._connected:
911
856
        self._socket = None
912
857
        self._connected = False
913
858
 
914
 
 
915
 
class SmartTCPClientMedium(SmartClientSocketMedium):
916
 
    """A client medium that creates a TCP connection."""
917
 
 
918
 
    def __init__(self, host, port, base):
919
 
        """Creates a client that will connect on the first use."""
920
 
        SmartClientSocketMedium.__init__(self, base)
921
 
        self._host = host
922
 
        self._port = port
923
 
 
924
859
    def _ensure_connection(self):
925
860
        """Connect this medium if not already connected."""
926
861
        if self._connected:
960
895
                    (self._host, port, err_msg))
961
896
        self._connected = True
962
897
 
963
 
 
964
 
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
965
 
    """A client medium for an already connected socket.
966
 
    
967
 
    Note that this class will assume it "owns" the socket, so it will close it
968
 
    when its disconnect method is called.
969
 
    """
970
 
 
971
 
    def __init__(self, base, sock):
972
 
        SmartClientSocketMedium.__init__(self, base)
973
 
        self._socket = sock
974
 
        self._connected = True
975
 
 
976
 
    def _ensure_connection(self):
977
 
        # Already connected, by definition!  So nothing to do.
978
 
        pass
 
898
    def _flush(self):
 
899
        """See SmartClientStreamMedium._flush().
 
900
 
 
901
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
902
        add a means to do a flush, but that can be done in the future.
 
903
        """
 
904
 
 
905
    def _read_bytes(self, count):
 
906
        """See SmartClientMedium.read_bytes."""
 
907
        if not self._connected:
 
908
            raise errors.MediumNotConnected(self)
 
909
        return osutils.read_bytes_from_socket(
 
910
            self._socket, self._report_activity)
979
911
 
980
912
 
981
913
class SmartClientStreamMediumRequest(SmartClientMediumRequest):