/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/smart/medium.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
 
22
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
23
over SSH), and pass them to and from the protocol logic.  See the overview in
24
 
breezy/transport/smart/__init__.py.
 
24
brzlib/transport/smart/__init__.py.
25
25
"""
26
26
 
 
27
from __future__ import absolute_import
 
28
 
27
29
import errno
28
 
import io
29
30
import os
30
31
import sys
31
32
import time
32
33
 
33
 
try:
34
 
    import _thread
35
 
except ImportError:
36
 
    import thread as _thread
37
 
 
38
 
import breezy
39
 
from ...lazy_import import lazy_import
 
34
import brzlib
 
35
from brzlib.lazy_import import lazy_import
40
36
lazy_import(globals(), """
41
37
import select
42
38
import socket
 
39
import thread
43
40
import weakref
44
41
 
45
 
from breezy import (
 
42
from brzlib import (
46
43
    debug,
 
44
    errors,
47
45
    trace,
48
46
    transport,
49
47
    ui,
50
48
    urlutils,
51
49
    )
52
 
from breezy.i18n import gettext
53
 
from breezy.bzr.smart import client, protocol, request, signals, vfs
54
 
from breezy.transport import ssh
 
50
from brzlib.i18n import gettext
 
51
from brzlib.smart import client, protocol, request, signals, vfs
 
52
from brzlib.transport import ssh
55
53
""")
56
 
from ... import (
57
 
    errors,
58
 
    osutils,
59
 
    )
 
54
from brzlib import osutils
60
55
 
61
56
# Throughout this module buffer size parameters are either limited to be at
62
57
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
64
59
# from non-sockets as well.
65
60
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
66
61
 
67
 
 
68
 
class HpssVfsRequestNotAllowed(errors.BzrError):
69
 
 
70
 
    _fmt = ("VFS requests over the smart server are not allowed. Encountered: "
71
 
            "%(method)s, %(arguments)s.")
72
 
 
73
 
    def __init__(self, method, arguments):
74
 
        self.method = method
75
 
        self.arguments = arguments
76
 
 
77
 
 
78
62
def _get_protocol_factory_for_bytes(bytes):
79
63
    """Determine the right protocol factory for 'bytes'.
80
64
 
116
100
    :returns: a tuple of two strs: (line, excess)
117
101
    """
118
102
    newline_pos = -1
119
 
    bytes = b''
 
103
    bytes = ''
120
104
    while newline_pos == -1:
121
105
        new_bytes = read_bytes_func(1)
122
106
        bytes += new_bytes
123
 
        if new_bytes == b'':
 
107
        if new_bytes == '':
124
108
            # Ran out of bytes before receiving a complete line.
125
 
            return bytes, b''
126
 
        newline_pos = bytes.find(b'\n')
127
 
    line = bytes[:newline_pos + 1]
128
 
    excess = bytes[newline_pos + 1:]
 
109
            return bytes, ''
 
110
        newline_pos = bytes.find('\n')
 
111
    line = bytes[:newline_pos+1]
 
112
    excess = bytes[newline_pos+1:]
129
113
    return line, excess
130
114
 
131
115
 
135
119
    def __init__(self):
136
120
        self._push_back_buffer = None
137
121
 
138
 
    def _push_back(self, data):
 
122
    def _push_back(self, bytes):
139
123
        """Return unused bytes to the medium, because they belong to the next
140
124
        request(s).
141
125
 
142
126
        This sets the _push_back_buffer to the given bytes.
143
127
        """
144
 
        if not isinstance(data, bytes):
145
 
            raise TypeError(data)
146
128
        if self._push_back_buffer is not None:
147
129
            raise AssertionError(
148
130
                "_push_back called when self._push_back_buffer is %r"
149
131
                % (self._push_back_buffer,))
150
 
        if data == b'':
 
132
        if bytes == '':
151
133
            return
152
 
        self._push_back_buffer = data
 
134
        self._push_back_buffer = bytes
153
135
 
154
136
    def _get_push_back_buffer(self):
155
 
        if self._push_back_buffer == b'':
 
137
        if self._push_back_buffer == '':
156
138
            raise AssertionError(
157
139
                '%s._push_back_buffer should never be the empty string, '
158
140
                'which can be confused with EOF' % (self,))
251
233
            while not self.finished:
252
234
                server_protocol = self._build_protocol()
253
235
                self._serve_one_request(server_protocol)
254
 
        except errors.ConnectionTimeout as e:
 
236
        except errors.ConnectionTimeout, e:
255
237
            trace.note('%s' % (e,))
256
238
            trace.log_exception_quietly()
257
239
            self._disconnect_client()
258
240
            # We reported it, no reason to make a big fuss.
259
241
            return
260
 
        except Exception as e:
 
242
        except Exception, e:
261
243
            stderr.write("%s terminating on exception %s\n" % (self, e))
262
244
            raise
263
245
        self._disconnect_client()
326
308
                    # select.error doesn't have 'errno', it just has args[0]
327
309
                    err = e.args[0]
328
310
                if err in _bad_file_descriptor:
329
 
                    return  # Not a socket indicates read() will fail
 
311
                    return # Not a socket indicates read() will fail
330
312
                elif err == errno.EINTR:
331
313
                    # Interrupted, keep looping.
332
314
                    continue
333
315
                raise
334
 
            except ValueError:
335
 
                return  # Socket may already be closed
336
316
        if rs or xs:
337
317
            return
338
318
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
349
329
            self._serve_one_request_unguarded(protocol)
350
330
        except KeyboardInterrupt:
351
331
            raise
352
 
        except Exception as e:
 
332
        except Exception, e:
353
333
            self.terminate_due_to_error()
354
334
 
355
335
    def terminate_due_to_error(self):
389
369
 
390
370
    def __repr__(self):
391
371
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
392
 
                                     self._client_info)
 
372
            self._client_info)
393
373
 
394
374
    def _serve_one_request_unguarded(self, protocol):
395
375
        while protocol.next_read_size():
397
377
            # than MAX_SOCKET_CHUNK ready, the socket will just return a
398
378
            # short read immediately rather than block.
399
379
            bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
400
 
            if bytes == b'':
 
380
            if bytes == '':
401
381
                self.finished = True
402
382
                return
403
383
            protocol.accept_bytes(bytes)
431
411
        self.finished = True
432
412
 
433
413
    def _write_out(self, bytes):
434
 
        tstart = osutils.perf_counter()
 
414
        tstart = osutils.timer_func()
435
415
        osutils.send_all(self.socket, bytes, self._report_activity)
436
416
        if 'hpss' in debug.debug_flags:
437
 
            thread_id = _thread.get_ident()
 
417
            thread_id = thread.get_ident()
438
418
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
439
419
                         % ('wrote', thread_id, len(bytes),
440
 
                            osutils.perf_counter() - tstart))
 
420
                            osutils.timer_func() - tstart))
441
421
 
442
422
 
443
423
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
450
430
        :param backing_transport: Transport for the directory served.
451
431
        """
452
432
        SmartServerStreamMedium.__init__(self, backing_transport,
453
 
                                         timeout=timeout)
 
433
            timeout=timeout)
454
434
        if sys.platform == 'win32':
455
435
            # force binary mode for files
456
436
            import msvcrt
483
463
                self._out.flush()
484
464
                return
485
465
            bytes = self.read_bytes(bytes_to_read)
486
 
            if bytes == b'':
 
466
            if bytes == '':
487
467
                # Connection has been closed.
488
468
                self.finished = True
489
469
                self._out.flush()
506
486
            data is available.
507
487
        """
508
488
        if (getattr(self._in, 'fileno', None) is None
509
 
                or sys.platform == 'win32'):
 
489
            or sys.platform == 'win32'):
510
490
            # You can't select() file descriptors on Windows.
511
491
            return
512
 
        try:
513
 
            return self._wait_on_descriptor(self._in, timeout_seconds)
514
 
        except io.UnsupportedOperation:
515
 
            return
 
492
        return self._wait_on_descriptor(self._in, timeout_seconds)
516
493
 
517
494
    def _read_bytes(self, desired_count):
518
495
        return self._in.read(desired_count)
647
624
 
648
625
    def read_line(self):
649
626
        line = self._read_line()
650
 
        if not line.endswith(b'\n'):
 
627
        if not line.endswith('\n'):
651
628
            # end of file encountered reading from server
652
629
            raise errors.ConnectionReset(
653
630
                "Unexpected end of message. Please check connectivity "
679
656
            # A method we don't know about doesn't count as a VFS method.
680
657
            return
681
658
        if issubclass(request_method, vfs.VfsRequest):
682
 
            raise HpssVfsRequestNotAllowed(params.method, params.args)
 
659
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
683
660
 
684
661
 
685
662
class _DebugCounter(object):
686
663
    """An object that counts the HPSS calls made to each client medium.
687
664
 
688
665
    When a medium is garbage-collected, or failing that when
689
 
    breezy.global_state exits, the total number of calls made on that medium
 
666
    brzlib.global_state exits, the total number of calls made on that medium
690
667
    are reported via trace.note.
691
668
    """
692
669
 
694
671
        self.counts = weakref.WeakKeyDictionary()
695
672
        client._SmartClient.hooks.install_named_hook(
696
673
            'call', self.increment_call_count, 'hpss call counter')
697
 
        breezy.get_global_state().exit_stack.callback(self.flush_all)
 
674
        brzlib.global_state.cleanups.add_cleanup(self.flush_all)
698
675
 
699
676
    def track(self, medium):
700
677
        """Start tracking calls made to a medium.
741
718
        for ref in list(self.counts.keys()):
742
719
            self.done(ref)
743
720
 
744
 
 
745
721
_debug_counter = None
746
722
_vfs_refuser = None
747
723
 
799
775
        :seealso: _is_remote_before
800
776
        """
801
777
        if (self._remote_version_is_before is not None and
802
 
                version_tuple > self._remote_version_is_before):
 
778
            version_tuple > self._remote_version_is_before):
803
779
            # We have been told that the remote side is older than some version
804
780
            # which is newer than a previously supplied older-than version.
805
781
            # This indicates that some smart verb call is not guarded
806
782
            # appropriately (it should simply not have been tried).
807
783
            trace.mutter(
808
784
                "_remember_remote_is_before(%r) called, but "
809
 
                "_remember_remote_is_before(%r) was called previously.", version_tuple, self._remote_version_is_before)
 
785
                "_remember_remote_is_before(%r) was called previously."
 
786
                , version_tuple, self._remote_version_is_before)
810
787
            if 'hpss' in debug.debug_flags:
811
788
                ui.ui_factory.show_warning(
812
789
                    "_remember_remote_is_before(%r) called, but "
824
801
                medium_request = self.get_request()
825
802
                # Send a 'hello' request in protocol version one, for maximum
826
803
                # backwards compatibility.
827
 
                client_protocol = protocol.SmartClientRequestProtocolOne(
828
 
                    medium_request)
 
804
                client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
829
805
                client_protocol.query_version()
830
806
                self._done_hello = True
831
 
            except errors.SmartProtocolError as e:
 
807
            except errors.SmartProtocolError, e:
832
808
                # Cache the error, just like we would cache a successful
833
809
                # result.
834
810
                self._protocol_version_error = e
927
903
        self._readable_pipe = readable_pipe
928
904
        self._writeable_pipe = writeable_pipe
929
905
 
930
 
    def _accept_bytes(self, data):
 
906
    def _accept_bytes(self, bytes):
931
907
        """See SmartClientStreamMedium.accept_bytes."""
932
908
        try:
933
 
            self._writeable_pipe.write(data)
934
 
        except IOError as e:
 
909
            self._writeable_pipe.write(bytes)
 
910
        except IOError, e:
935
911
            if e.errno in (errno.EINVAL, errno.EPIPE):
936
912
                raise errors.ConnectionReset(
937
913
                    "Error trying to write to subprocess", e)
938
914
            raise
939
 
        self._report_activity(len(data), 'write')
 
915
        self._report_activity(len(bytes), 'write')
940
916
 
941
917
    def _flush(self):
942
918
        """See SmartClientStreamMedium._flush()."""
948
924
    def _read_bytes(self, count):
949
925
        """See SmartClientStreamMedium._read_bytes."""
950
926
        bytes_to_read = min(count, _MAX_READ_SIZE)
951
 
        data = self._readable_pipe.read(bytes_to_read)
952
 
        self._report_activity(len(data), 'read')
953
 
        return data
 
927
        bytes = self._readable_pipe.read(bytes_to_read)
 
928
        self._report_activity(len(bytes), 'read')
 
929
        return bytes
954
930
 
955
931
 
956
932
class SSHParams(object):
957
933
    """A set of parameters for starting a remote bzr via SSH."""
958
934
 
959
935
    def __init__(self, host, port=None, username=None, password=None,
960
 
                 bzr_remote_path='bzr'):
 
936
            bzr_remote_path='bzr'):
961
937
        self.host = host
962
938
        self.port = port
963
939
        self.username = username
977
953
 
978
954
        :param ssh_params: A SSHParams instance.
979
955
        :param vendor: An optional override for the ssh vendor to use. See
980
 
            breezy.transport.ssh for details on ssh vendors.
 
956
            brzlib.transport.ssh for details on ssh vendors.
981
957
        """
982
958
        self._real_medium = None
983
959
        self._ssh_params = ssh_params
1030
1006
        else:
1031
1007
            vendor = self._vendor
1032
1008
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1033
 
                                                  self._ssh_params.password, self._ssh_params.host,
1034
 
                                                  self._ssh_params.port,
1035
 
                                                  command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1036
 
                                                           '--directory=/', '--allow-writes'])
 
1009
                self._ssh_params.password, self._ssh_params.host,
 
1010
                self._ssh_params.port,
 
1011
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
1012
                         '--directory=/', '--allow-writes'])
1037
1013
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1038
1014
        if io_kind == 'socket':
1039
1015
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1128
1104
            port = int(self._port)
1129
1105
        try:
1130
1106
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
1131
 
                                           socket.SOCK_STREAM, 0, 0)
1132
 
        except socket.gaierror as xxx_todo_changeme:
1133
 
            (err_num, err_msg) = xxx_todo_changeme.args
 
1107
                socket.SOCK_STREAM, 0, 0)
 
1108
        except socket.gaierror, (err_num, err_msg):
1134
1109
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
1135
 
                                         (self._host, port, err_msg))
 
1110
                    (self._host, port, err_msg))
1136
1111
        # Initialize err in case there are no addresses returned:
1137
 
        last_err = socket.error("no address found for %s" % self._host)
 
1112
        err = socket.error("no address found for %s" % self._host)
1138
1113
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
1139
1114
            try:
1140
1115
                self._socket = socket.socket(family, socktype, proto)
1141
1116
                self._socket.setsockopt(socket.IPPROTO_TCP,
1142
1117
                                        socket.TCP_NODELAY, 1)
1143
1118
                self._socket.connect(sockaddr)
1144
 
            except socket.error as err:
 
1119
            except socket.error, err:
1145
1120
                if self._socket is not None:
1146
1121
                    self._socket.close()
1147
1122
                self._socket = None
1148
 
                last_err = err
1149
1123
                continue
1150
1124
            break
1151
1125
        if self._socket is None:
1152
1126
            # socket errors either have a (string) or (errno, string) as their
1153
1127
            # args.
1154
 
            if isinstance(last_err.args, str):
1155
 
                err_msg = last_err.args
 
1128
            if type(err.args) is str:
 
1129
                err_msg = err.args
1156
1130
            else:
1157
 
                err_msg = last_err.args[1]
 
1131
                err_msg = err.args[1]
1158
1132
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1159
 
                                         (self._host, port, err_msg))
 
1133
                    (self._host, port, err_msg))
1160
1134
        self._connected = True
1161
1135
        for hook in transport.Transport.hooks["post_connect"]:
1162
1136
            hook(self)
1164
1138
 
1165
1139
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1166
1140
    """A client medium for an already connected socket.
1167
 
 
 
1141
    
1168
1142
    Note that this class will assume it "owns" the socket, so it will close it
1169
1143
    when its disconnect method is called.
1170
1144
    """