/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Robert Collins
  • Date: 2010-05-06 11:08:10 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100506110810-h3j07fh5gmw54s25
Cleaner matcher matching revised unlocking protocol.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
import sys
29
29
import urllib
30
30
 
31
 
import bzrlib
32
31
from bzrlib.lazy_import import lazy_import
33
32
lazy_import(globals(), """
 
33
import atexit
34
34
import socket
35
35
import thread
36
36
import weakref
38
38
from bzrlib import (
39
39
    debug,
40
40
    errors,
 
41
    symbol_versioning,
41
42
    trace,
42
43
    ui,
43
44
    urlutils,
44
45
    )
45
 
from bzrlib.i18n import gettext
46
46
from bzrlib.smart import client, protocol, request, vfs
47
47
from bzrlib.transport import ssh
48
48
""")
491
491
        return self._medium._get_line()
492
492
 
493
493
 
494
 
class _VfsRefuser(object):
495
 
    """An object that refuses all VFS requests.
496
 
 
497
 
    """
498
 
 
499
 
    def __init__(self):
500
 
        client._SmartClient.hooks.install_named_hook(
501
 
            'call', self.check_vfs, 'vfs refuser')
502
 
 
503
 
    def check_vfs(self, params):
504
 
        try:
505
 
            request_method = request.request_handlers.get(params.method)
506
 
        except KeyError:
507
 
            # A method we don't know about doesn't count as a VFS method.
508
 
            return
509
 
        if issubclass(request_method, vfs.VfsRequest):
510
 
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
511
 
 
512
 
 
513
494
class _DebugCounter(object):
514
495
    """An object that counts the HPSS calls made to each client medium.
515
496
 
516
 
    When a medium is garbage-collected, or failing that when
517
 
    bzrlib.global_state exits, the total number of calls made on that medium
518
 
    are reported via trace.note.
 
497
    When a medium is garbage-collected, or failing that when atexit functions
 
498
    are run, the total number of calls made on that medium are reported via
 
499
    trace.note.
519
500
    """
520
501
 
521
502
    def __init__(self):
522
503
        self.counts = weakref.WeakKeyDictionary()
523
504
        client._SmartClient.hooks.install_named_hook(
524
505
            'call', self.increment_call_count, 'hpss call counter')
525
 
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
 
506
        atexit.register(self.flush_all)
526
507
 
527
508
    def track(self, medium):
528
509
        """Start tracking calls made to a medium.
562
543
        value['count'] = 0
563
544
        value['vfs_count'] = 0
564
545
        if count != 0:
565
 
            trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
566
 
                       count, vfs_count, medium_repr))
 
546
            trace.note('HPSS calls: %d (%d vfs) %s',
 
547
                       count, vfs_count, medium_repr)
567
548
 
568
549
    def flush_all(self):
569
550
        for ref in list(self.counts.keys()):
570
551
            self.done(ref)
571
552
 
572
553
_debug_counter = None
573
 
_vfs_refuser = None
574
554
 
575
555
 
576
556
class SmartClientMedium(SmartMedium):
593
573
            if _debug_counter is None:
594
574
                _debug_counter = _DebugCounter()
595
575
            _debug_counter.track(self)
596
 
        if 'hpss_client_no_vfs' in debug.debug_flags:
597
 
            global _vfs_refuser
598
 
            if _vfs_refuser is None:
599
 
                _vfs_refuser = _VfsRefuser()
600
576
 
601
577
    def _is_remote_before(self, version_tuple):
602
578
        """Is it possible the remote side supports RPCs for a given version?
739
715
    """A client medium using simple pipes.
740
716
 
741
717
    This client does not manage the pipes: it assumes they will always be open.
 
718
 
 
719
    Note that if readable_pipe.read might raise IOError or OSError with errno
 
720
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
 
721
    (such as used for sys.stdin) are safe.
742
722
    """
743
723
 
744
724
    def __init__(self, readable_pipe, writeable_pipe, base):
757
737
 
758
738
    def _read_bytes(self, count):
759
739
        """See SmartClientStreamMedium._read_bytes."""
760
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
761
 
        bytes = self._readable_pipe.read(bytes_to_read)
 
740
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
762
741
        self._report_activity(len(bytes), 'read')
763
742
        return bytes
764
743
 
765
744
 
766
 
class SSHParams(object):
767
 
    """A set of parameters for starting a remote bzr via SSH."""
 
745
class SmartSSHClientMedium(SmartClientStreamMedium):
 
746
    """A client medium using SSH."""
768
747
 
769
748
    def __init__(self, host, port=None, username=None, password=None,
770
 
            bzr_remote_path='bzr'):
771
 
        self.host = host
772
 
        self.port = port
773
 
        self.username = username
774
 
        self.password = password
775
 
        self.bzr_remote_path = bzr_remote_path
776
 
 
777
 
 
778
 
class SmartSSHClientMedium(SmartClientStreamMedium):
779
 
    """A client medium using SSH.
780
 
    
781
 
    It delegates IO to a SmartClientSocketMedium or
782
 
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
783
 
    """
784
 
 
785
 
    def __init__(self, base, ssh_params, vendor=None):
 
749
            base=None, vendor=None, bzr_remote_path=None):
786
750
        """Creates a client that will connect on the first use.
787
751
 
788
 
        :param ssh_params: A SSHParams instance.
789
752
        :param vendor: An optional override for the ssh vendor to use. See
790
753
            bzrlib.transport.ssh for details on ssh vendors.
791
754
        """
792
 
        self._real_medium = None
793
 
        self._ssh_params = ssh_params
 
755
        self._connected = False
 
756
        self._host = host
 
757
        self._password = password
 
758
        self._port = port
 
759
        self._username = username
794
760
        # for the benefit of progress making a short description of this
795
761
        # transport
796
762
        self._scheme = 'bzr+ssh'
798
764
        # _DebugCounter so we have to store all the values used in our repr
799
765
        # method before calling the super init.
800
766
        SmartClientStreamMedium.__init__(self, base)
 
767
        self._read_from = None
 
768
        self._ssh_connection = None
801
769
        self._vendor = vendor
802
 
        self._ssh_connection = None
 
770
        self._write_to = None
 
771
        self._bzr_remote_path = bzr_remote_path
803
772
 
804
773
    def __repr__(self):
805
 
        if self._ssh_params.port is None:
 
774
        if self._port is None:
806
775
            maybe_port = ''
807
776
        else:
808
 
            maybe_port = ':%s' % self._ssh_params.port
 
777
            maybe_port = ':%s' % self._port
809
778
        return "%s(%s://%s@%s%s/)" % (
810
779
            self.__class__.__name__,
811
780
            self._scheme,
812
 
            self._ssh_params.username,
813
 
            self._ssh_params.host,
 
781
            self._username,
 
782
            self._host,
814
783
            maybe_port)
815
784
 
816
785
    def _accept_bytes(self, bytes):
817
786
        """See SmartClientStreamMedium.accept_bytes."""
818
787
        self._ensure_connection()
819
 
        self._real_medium.accept_bytes(bytes)
 
788
        self._write_to.write(bytes)
 
789
        self._report_activity(len(bytes), 'write')
820
790
 
821
791
    def disconnect(self):
822
792
        """See SmartClientMedium.disconnect()."""
823
 
        if self._real_medium is not None:
824
 
            self._real_medium.disconnect()
825
 
            self._real_medium = None
826
 
        if self._ssh_connection is not None:
827
 
            self._ssh_connection.close()
828
 
            self._ssh_connection = None
 
793
        if not self._connected:
 
794
            return
 
795
        self._read_from.close()
 
796
        self._write_to.close()
 
797
        self._ssh_connection.close()
 
798
        self._connected = False
829
799
 
830
800
    def _ensure_connection(self):
831
801
        """Connect this medium if not already connected."""
832
 
        if self._real_medium is not None:
 
802
        if self._connected:
833
803
            return
834
804
        if self._vendor is None:
835
805
            vendor = ssh._get_ssh_vendor()
836
806
        else:
837
807
            vendor = self._vendor
838
 
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
839
 
                self._ssh_params.password, self._ssh_params.host,
840
 
                self._ssh_params.port,
841
 
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
808
        self._ssh_connection = vendor.connect_ssh(self._username,
 
809
                self._password, self._host, self._port,
 
810
                command=[self._bzr_remote_path, 'serve', '--inet',
842
811
                         '--directory=/', '--allow-writes'])
843
 
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
844
 
        if io_kind == 'socket':
845
 
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
846
 
                self.base, io_object)
847
 
        elif io_kind == 'pipes':
848
 
            read_from, write_to = io_object
849
 
            self._real_medium = SmartSimplePipesClientMedium(
850
 
                read_from, write_to, self.base)
851
 
        else:
852
 
            raise AssertionError(
853
 
                "Unexpected io_kind %r from %r"
854
 
                % (io_kind, self._ssh_connection))
 
812
        self._read_from, self._write_to = \
 
813
            self._ssh_connection.get_filelike_channels()
 
814
        self._connected = True
855
815
 
856
816
    def _flush(self):
857
817
        """See SmartClientStreamMedium._flush()."""
858
 
        self._real_medium._flush()
 
818
        self._write_to.flush()
859
819
 
860
820
    def _read_bytes(self, count):
861
821
        """See SmartClientStreamMedium.read_bytes."""
862
 
        if self._real_medium is None:
 
822
        if not self._connected:
863
823
            raise errors.MediumNotConnected(self)
864
 
        return self._real_medium.read_bytes(count)
 
824
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
825
        bytes = self._read_from.read(bytes_to_read)
 
826
        self._report_activity(len(bytes), 'read')
 
827
        return bytes
865
828
 
866
829
 
867
830
# Port 4155 is the default port for bzr://, registered with IANA.
869
832
BZR_DEFAULT_PORT = 4155
870
833
 
871
834
 
872
 
class SmartClientSocketMedium(SmartClientStreamMedium):
873
 
    """A client medium using a socket.
874
 
    
875
 
    This class isn't usable directly.  Use one of its subclasses instead.
876
 
    """
 
835
class SmartTCPClientMedium(SmartClientStreamMedium):
 
836
    """A client medium using TCP."""
877
837
 
878
 
    def __init__(self, base):
 
838
    def __init__(self, host, port, base):
 
839
        """Creates a client that will connect on the first use."""
879
840
        SmartClientStreamMedium.__init__(self, base)
 
841
        self._connected = False
 
842
        self._host = host
 
843
        self._port = port
880
844
        self._socket = None
881
 
        self._connected = False
882
845
 
883
846
    def _accept_bytes(self, bytes):
884
847
        """See SmartClientMedium.accept_bytes."""
885
848
        self._ensure_connection()
886
849
        osutils.send_all(self._socket, bytes, self._report_activity)
887
850
 
888
 
    def _ensure_connection(self):
889
 
        """Connect this medium if not already connected."""
890
 
        raise NotImplementedError(self._ensure_connection)
891
 
 
892
 
    def _flush(self):
893
 
        """See SmartClientStreamMedium._flush().
894
 
 
895
 
        For sockets we do no flushing. For TCP sockets we may want to turn off
896
 
        TCP_NODELAY and add a means to do a flush, but that can be done in the
897
 
        future.
898
 
        """
899
 
 
900
 
    def _read_bytes(self, count):
901
 
        """See SmartClientMedium.read_bytes."""
902
 
        if not self._connected:
903
 
            raise errors.MediumNotConnected(self)
904
 
        return osutils.read_bytes_from_socket(
905
 
            self._socket, self._report_activity)
906
 
 
907
851
    def disconnect(self):
908
852
        """See SmartClientMedium.disconnect()."""
909
853
        if not self._connected:
912
856
        self._socket = None
913
857
        self._connected = False
914
858
 
915
 
 
916
 
class SmartTCPClientMedium(SmartClientSocketMedium):
917
 
    """A client medium that creates a TCP connection."""
918
 
 
919
 
    def __init__(self, host, port, base):
920
 
        """Creates a client that will connect on the first use."""
921
 
        SmartClientSocketMedium.__init__(self, base)
922
 
        self._host = host
923
 
        self._port = port
924
 
 
925
859
    def _ensure_connection(self):
926
860
        """Connect this medium if not already connected."""
927
861
        if self._connected:
961
895
                    (self._host, port, err_msg))
962
896
        self._connected = True
963
897
 
964
 
 
965
 
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
966
 
    """A client medium for an already connected socket.
967
 
    
968
 
    Note that this class will assume it "owns" the socket, so it will close it
969
 
    when its disconnect method is called.
970
 
    """
971
 
 
972
 
    def __init__(self, base, sock):
973
 
        SmartClientSocketMedium.__init__(self, base)
974
 
        self._socket = sock
975
 
        self._connected = True
976
 
 
977
 
    def _ensure_connection(self):
978
 
        # Already connected, by definition!  So nothing to do.
979
 
        pass
 
898
    def _flush(self):
 
899
        """See SmartClientStreamMedium._flush().
 
900
 
 
901
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
902
        add a means to do a flush, but that can be done in the future.
 
903
        """
 
904
 
 
905
    def _read_bytes(self, count):
 
906
        """See SmartClientMedium.read_bytes."""
 
907
        if not self._connected:
 
908
            raise errors.MediumNotConnected(self)
 
909
        return osutils.read_bytes_from_socket(
 
910
            self._socket, self._report_activity)
980
911
 
981
912
 
982
913
class SmartClientStreamMediumRequest(SmartClientMediumRequest):