/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-04-05 07:31:43 UTC
  • mfrom: (2376.3.9 hpss-hooks)
  • Revision ID: pqm@pqm.ubuntu.com-20070405073143-8fa894c829ab5e50
(robertc) SmartServer startup and shutdown hooks, and smart server shutdown logic implementation. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
195
195
#
196
196
 
197
197
from cStringIO import StringIO
 
198
import errno
198
199
import os
199
200
import socket
200
201
import sys
212
213
    urlutils,
213
214
    )
214
215
from bzrlib.bundle.serializer import write_bundle
 
216
from bzrlib.hooks import Hooks
215
217
try:
216
218
    from bzrlib.transport import ssh
217
219
except errors.ParamikoNotPresent:
820
822
 
821
823
 
822
824
class SmartTCPServer(object):
823
 
    """Listens on a TCP socket and accepts connections from smart clients"""
 
825
    """Listens on a TCP socket and accepts connections from smart clients
 
826
 
 
827
    hooks: An instance of SmartServerHooks.
 
828
    """
824
829
 
825
830
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
826
831
        """Construct a new server.
831
836
        :param host: Name of the interface to listen on.
832
837
        :param port: TCP port to listen on, or 0 to allocate a transient port.
833
838
        """
834
 
        self._server_socket = socket.socket()
835
 
        self._server_socket.bind((host, port))
836
 
        self.port = self._server_socket.getsockname()[1]
837
 
        self._server_socket.listen(1)
838
 
        self._server_socket.settimeout(1)
839
 
        self.backing_transport = backing_transport
840
 
 
841
 
    def serve(self):
842
839
        # let connections timeout so that we get a chance to terminate
843
840
        # Keep a reference to the exceptions we want to catch because the socket
844
841
        # module's globals get set to None during interpreter shutdown.
845
842
        from socket import timeout as socket_timeout
846
843
        from socket import error as socket_error
 
844
        self._socket_error = socket_error
 
845
        self._socket_timeout = socket_timeout
 
846
        self._server_socket = socket.socket()
 
847
        self._server_socket.bind((host, port))
 
848
        self._sockname = self._server_socket.getsockname()
 
849
        self.port = self._sockname[1]
 
850
        self._server_socket.listen(1)
 
851
        self._server_socket.settimeout(1)
 
852
        self.backing_transport = backing_transport
 
853
        self._started = threading.Event()
 
854
        self._stopped = threading.Event()
 
855
 
 
856
    def serve(self):
847
857
        self._should_terminate = False
848
 
        while not self._should_terminate:
849
 
            try:
850
 
                self.accept_and_serve()
851
 
            except socket_timeout:
852
 
                # just check if we're asked to stop
853
 
                pass
854
 
            except socket_error, e:
855
 
                trace.warning("client disconnected: %s", e)
856
 
                pass
 
858
        for hook in SmartTCPServer.hooks['server_started']:
 
859
            hook(self.backing_transport.base, self.get_url())
 
860
        self._started.set()
 
861
        try:
 
862
            try:
 
863
                while not self._should_terminate:
 
864
                    try:
 
865
                        conn, client_addr = self._server_socket.accept()
 
866
                    except self._socket_timeout:
 
867
                        # just check if we're asked to stop
 
868
                        pass
 
869
                    except self._socket_error, e:
 
870
                        # if the socket is closed by stop_background_thread
 
871
                        # we might get a EBADF here, any other socket errors
 
872
                        # should get logged.
 
873
                        if e.args[0] != errno.EBADF:
 
874
                            trace.warning("listening socket error: %s", e)
 
875
                    else:
 
876
                        self.serve_conn(conn)
 
877
            except KeyboardInterrupt:
 
878
                # dont log when CTRL-C'd.
 
879
                raise
 
880
            except Exception, e:
 
881
                trace.error("Unhandled smart server error.")
 
882
                trace.log_exception_quietly()
 
883
                raise
 
884
        finally:
 
885
            self._stopped.set()
 
886
            try:
 
887
                # ensure the server socket is closed.
 
888
                self._server_socket.close()
 
889
            except self._socket_error:
 
890
                # ignore errors on close
 
891
                pass
 
892
            for hook in SmartTCPServer.hooks['server_stopped']:
 
893
                hook(self.backing_transport.base, self.get_url())
857
894
 
858
895
    def get_url(self):
859
896
        """Return the url of the server"""
860
 
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
897
        return "bzr://%s:%d/" % self._sockname
861
898
 
862
 
    def accept_and_serve(self):
863
 
        conn, client_addr = self._server_socket.accept()
 
899
    def serve_conn(self, conn):
864
900
        # For WIN32, where the timeout value from the listening socket
865
901
        # propogates to the newly accepted socket.
866
902
        conn.setblocking(True)
871
907
        connection_thread.start()
872
908
 
873
909
    def start_background_thread(self):
 
910
        self._started.clear()
874
911
        self._server_thread = threading.Thread(None,
875
912
                self.serve,
876
913
                name='server-' + self.get_url())
877
914
        self._server_thread.setDaemon(True)
878
915
        self._server_thread.start()
 
916
        self._started.wait()
879
917
 
880
918
    def stop_background_thread(self):
 
919
        self._stopped.clear()
 
920
        # tell the main loop to quit on the next iteration.
881
921
        self._should_terminate = True
882
 
        # At one point we would wait to join the threads here, but it looks
883
 
        # like they don't actually exit.  So now we just leave them running
884
 
        # and expect to terminate the process. -- mbp 20070215
885
 
        # self._server_socket.close()
886
 
        ## sys.stderr.write("waiting for server thread to finish...")
887
 
        ## self._server_thread.join()
 
922
        # close the socket - gives error to connections from here on in,
 
923
        # rather than a connection reset error to connections made during
 
924
        # the period between setting _should_terminate = True and 
 
925
        # the current request completing/aborting. It may also break out the
 
926
        # main loop if it was currently in accept() (on some platforms).
 
927
        try:
 
928
            self._server_socket.close()
 
929
        except self._socket_error:
 
930
            # ignore errors on close
 
931
            pass
 
932
        if not self._stopped.isSet():
 
933
            # server has not stopped (though it may be stopping)
 
934
            # its likely in accept(), so give it a connection
 
935
            temp_socket = socket.socket()
 
936
            temp_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
937
            if not temp_socket.connect_ex(self._sockname):
 
938
                # and close it immediately: we dont choose to send any requests.
 
939
                temp_socket.close()
 
940
        self._stopped.wait()
 
941
        self._server_thread.join()
 
942
 
 
943
 
 
944
class SmartServerHooks(Hooks):
 
945
    """Hooks for the smart server."""
 
946
 
 
947
    def __init__(self):
 
948
        """Create the default hooks.
 
949
 
 
950
        These are all empty initially, because by default nothing should get
 
951
        notified.
 
952
        """
 
953
        Hooks.__init__(self)
 
954
        # Introduced in 0.16:
 
955
        # invoked whenever the server starts serving a directory.
 
956
        # The api signature is (backing url, public url).
 
957
        self['server_started'] = []
 
958
        # Introduced in 0.16:
 
959
        # invoked whenever the server stops serving a directory.
 
960
        # The api signature is (backing url, public url).
 
961
        self['server_stopped'] = []
 
962
 
 
963
SmartTCPServer.hooks = SmartServerHooks()
888
964
 
889
965
 
890
966
class SmartTCPServer_for_testing(SmartTCPServer):