/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/smart/medium.py

  • Committer: Breezy landing bot
  • Author(s): Colin Watson
  • Date: 2020-11-16 21:47:08 UTC
  • mfrom: (7521.1.1 remove-lp-workaround)
  • Revision ID: breezy.the.bot@gmail.com-20201116214708-jos209mgxi41oy15
Remove breezy.git workaround for bazaar.launchpad.net.

Merged from https://code.launchpad.net/~cjwatson/brz/remove-lp-workaround/+merge/393710

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
 
22
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
23
over SSH), and pass them to and from the protocol logic.  See the overview in
24
 
bzrlib/transport/smart/__init__.py.
 
24
breezy/transport/smart/__init__.py.
25
25
"""
26
26
 
27
 
from __future__ import absolute_import
28
 
 
29
27
import errno
 
28
import io
30
29
import os
31
30
import sys
32
31
import time
33
32
 
34
 
import bzrlib
35
 
from bzrlib.lazy_import import lazy_import
 
33
try:
 
34
    import _thread
 
35
except ImportError:
 
36
    import thread as _thread
 
37
 
 
38
import breezy
 
39
from ...lazy_import import lazy_import
36
40
lazy_import(globals(), """
37
41
import select
38
42
import socket
39
 
import thread
40
43
import weakref
41
44
 
42
 
from bzrlib import (
 
45
from breezy import (
43
46
    debug,
44
 
    errors,
45
47
    trace,
46
48
    transport,
47
49
    ui,
48
50
    urlutils,
49
51
    )
50
 
from bzrlib.i18n import gettext
51
 
from bzrlib.smart import client, protocol, request, signals, vfs
52
 
from bzrlib.transport import ssh
 
52
from breezy.i18n import gettext
 
53
from breezy.bzr.smart import client, protocol, request, signals, vfs
 
54
from breezy.transport import ssh
53
55
""")
54
 
from bzrlib import osutils
 
56
from ... import (
 
57
    errors,
 
58
    osutils,
 
59
    )
55
60
 
56
61
# Throughout this module buffer size parameters are either limited to be at
57
62
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
59
64
# from non-sockets as well.
60
65
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
61
66
 
 
67
 
 
68
class HpssVfsRequestNotAllowed(errors.BzrError):
 
69
 
 
70
    _fmt = ("VFS requests over the smart server are not allowed. Encountered: "
 
71
            "%(method)s, %(arguments)s.")
 
72
 
 
73
    def __init__(self, method, arguments):
 
74
        self.method = method
 
75
        self.arguments = arguments
 
76
 
 
77
 
62
78
def _get_protocol_factory_for_bytes(bytes):
63
79
    """Determine the right protocol factory for 'bytes'.
64
80
 
100
116
    :returns: a tuple of two strs: (line, excess)
101
117
    """
102
118
    newline_pos = -1
103
 
    bytes = ''
 
119
    bytes = b''
104
120
    while newline_pos == -1:
105
121
        new_bytes = read_bytes_func(1)
106
122
        bytes += new_bytes
107
 
        if new_bytes == '':
 
123
        if new_bytes == b'':
108
124
            # Ran out of bytes before receiving a complete line.
109
 
            return bytes, ''
110
 
        newline_pos = bytes.find('\n')
111
 
    line = bytes[:newline_pos+1]
112
 
    excess = bytes[newline_pos+1:]
 
125
            return bytes, b''
 
126
        newline_pos = bytes.find(b'\n')
 
127
    line = bytes[:newline_pos + 1]
 
128
    excess = bytes[newline_pos + 1:]
113
129
    return line, excess
114
130
 
115
131
 
119
135
    def __init__(self):
120
136
        self._push_back_buffer = None
121
137
 
122
 
    def _push_back(self, bytes):
 
138
    def _push_back(self, data):
123
139
        """Return unused bytes to the medium, because they belong to the next
124
140
        request(s).
125
141
 
126
142
        This sets the _push_back_buffer to the given bytes.
127
143
        """
 
144
        if not isinstance(data, bytes):
 
145
            raise TypeError(data)
128
146
        if self._push_back_buffer is not None:
129
147
            raise AssertionError(
130
148
                "_push_back called when self._push_back_buffer is %r"
131
149
                % (self._push_back_buffer,))
132
 
        if bytes == '':
 
150
        if data == b'':
133
151
            return
134
 
        self._push_back_buffer = bytes
 
152
        self._push_back_buffer = data
135
153
 
136
154
    def _get_push_back_buffer(self):
137
 
        if self._push_back_buffer == '':
 
155
        if self._push_back_buffer == b'':
138
156
            raise AssertionError(
139
157
                '%s._push_back_buffer should never be the empty string, '
140
158
                'which can be confused with EOF' % (self,))
233
251
            while not self.finished:
234
252
                server_protocol = self._build_protocol()
235
253
                self._serve_one_request(server_protocol)
236
 
        except errors.ConnectionTimeout, e:
 
254
        except errors.ConnectionTimeout as e:
237
255
            trace.note('%s' % (e,))
238
256
            trace.log_exception_quietly()
239
257
            self._disconnect_client()
240
258
            # We reported it, no reason to make a big fuss.
241
259
            return
242
 
        except Exception, e:
 
260
        except Exception as e:
243
261
            stderr.write("%s terminating on exception %s\n" % (self, e))
244
262
            raise
245
263
        self._disconnect_client()
308
326
                    # select.error doesn't have 'errno', it just has args[0]
309
327
                    err = e.args[0]
310
328
                if err in _bad_file_descriptor:
311
 
                    return # Not a socket indicates read() will fail
 
329
                    return  # Not a socket indicates read() will fail
312
330
                elif err == errno.EINTR:
313
331
                    # Interrupted, keep looping.
314
332
                    continue
315
333
                raise
 
334
            except ValueError:
 
335
                return  # Socket may already be closed
316
336
        if rs or xs:
317
337
            return
318
338
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
329
349
            self._serve_one_request_unguarded(protocol)
330
350
        except KeyboardInterrupt:
331
351
            raise
332
 
        except Exception, e:
 
352
        except Exception as e:
333
353
            self.terminate_due_to_error()
334
354
 
335
355
    def terminate_due_to_error(self):
369
389
 
370
390
    def __repr__(self):
371
391
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
372
 
            self._client_info)
 
392
                                     self._client_info)
373
393
 
374
394
    def _serve_one_request_unguarded(self, protocol):
375
395
        while protocol.next_read_size():
377
397
            # than MAX_SOCKET_CHUNK ready, the socket will just return a
378
398
            # short read immediately rather than block.
379
399
            bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
380
 
            if bytes == '':
 
400
            if bytes == b'':
381
401
                self.finished = True
382
402
                return
383
403
            protocol.accept_bytes(bytes)
411
431
        self.finished = True
412
432
 
413
433
    def _write_out(self, bytes):
414
 
        tstart = osutils.timer_func()
 
434
        tstart = osutils.perf_counter()
415
435
        osutils.send_all(self.socket, bytes, self._report_activity)
416
436
        if 'hpss' in debug.debug_flags:
417
 
            thread_id = thread.get_ident()
 
437
            thread_id = _thread.get_ident()
418
438
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
419
439
                         % ('wrote', thread_id, len(bytes),
420
 
                            osutils.timer_func() - tstart))
 
440
                            osutils.perf_counter() - tstart))
421
441
 
422
442
 
423
443
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
430
450
        :param backing_transport: Transport for the directory served.
431
451
        """
432
452
        SmartServerStreamMedium.__init__(self, backing_transport,
433
 
            timeout=timeout)
 
453
                                         timeout=timeout)
434
454
        if sys.platform == 'win32':
435
455
            # force binary mode for files
436
456
            import msvcrt
463
483
                self._out.flush()
464
484
                return
465
485
            bytes = self.read_bytes(bytes_to_read)
466
 
            if bytes == '':
 
486
            if bytes == b'':
467
487
                # Connection has been closed.
468
488
                self.finished = True
469
489
                self._out.flush()
486
506
            data is available.
487
507
        """
488
508
        if (getattr(self._in, 'fileno', None) is None
489
 
            or sys.platform == 'win32'):
 
509
                or sys.platform == 'win32'):
490
510
            # You can't select() file descriptors on Windows.
491
511
            return
492
 
        return self._wait_on_descriptor(self._in, timeout_seconds)
 
512
        try:
 
513
            return self._wait_on_descriptor(self._in, timeout_seconds)
 
514
        except io.UnsupportedOperation:
 
515
            return
493
516
 
494
517
    def _read_bytes(self, desired_count):
495
518
        return self._in.read(desired_count)
624
647
 
625
648
    def read_line(self):
626
649
        line = self._read_line()
627
 
        if not line.endswith('\n'):
 
650
        if not line.endswith(b'\n'):
628
651
            # end of file encountered reading from server
629
652
            raise errors.ConnectionReset(
630
653
                "Unexpected end of message. Please check connectivity "
656
679
            # A method we don't know about doesn't count as a VFS method.
657
680
            return
658
681
        if issubclass(request_method, vfs.VfsRequest):
659
 
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
 
682
            raise HpssVfsRequestNotAllowed(params.method, params.args)
660
683
 
661
684
 
662
685
class _DebugCounter(object):
663
686
    """An object that counts the HPSS calls made to each client medium.
664
687
 
665
688
    When a medium is garbage-collected, or failing that when
666
 
    bzrlib.global_state exits, the total number of calls made on that medium
 
689
    breezy.global_state exits, the total number of calls made on that medium
667
690
    are reported via trace.note.
668
691
    """
669
692
 
671
694
        self.counts = weakref.WeakKeyDictionary()
672
695
        client._SmartClient.hooks.install_named_hook(
673
696
            'call', self.increment_call_count, 'hpss call counter')
674
 
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
 
697
        breezy.get_global_state().exit_stack.callback(self.flush_all)
675
698
 
676
699
    def track(self, medium):
677
700
        """Start tracking calls made to a medium.
718
741
        for ref in list(self.counts.keys()):
719
742
            self.done(ref)
720
743
 
 
744
 
721
745
_debug_counter = None
722
746
_vfs_refuser = None
723
747
 
775
799
        :seealso: _is_remote_before
776
800
        """
777
801
        if (self._remote_version_is_before is not None and
778
 
            version_tuple > self._remote_version_is_before):
 
802
                version_tuple > self._remote_version_is_before):
779
803
            # We have been told that the remote side is older than some version
780
804
            # which is newer than a previously supplied older-than version.
781
805
            # This indicates that some smart verb call is not guarded
782
806
            # appropriately (it should simply not have been tried).
783
807
            trace.mutter(
784
808
                "_remember_remote_is_before(%r) called, but "
785
 
                "_remember_remote_is_before(%r) was called previously."
786
 
                , version_tuple, self._remote_version_is_before)
 
809
                "_remember_remote_is_before(%r) was called previously.", version_tuple, self._remote_version_is_before)
787
810
            if 'hpss' in debug.debug_flags:
788
811
                ui.ui_factory.show_warning(
789
812
                    "_remember_remote_is_before(%r) called, but "
801
824
                medium_request = self.get_request()
802
825
                # Send a 'hello' request in protocol version one, for maximum
803
826
                # backwards compatibility.
804
 
                client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
 
827
                client_protocol = protocol.SmartClientRequestProtocolOne(
 
828
                    medium_request)
805
829
                client_protocol.query_version()
806
830
                self._done_hello = True
807
 
            except errors.SmartProtocolError, e:
 
831
            except errors.SmartProtocolError as e:
808
832
                # Cache the error, just like we would cache a successful
809
833
                # result.
810
834
                self._protocol_version_error = e
903
927
        self._readable_pipe = readable_pipe
904
928
        self._writeable_pipe = writeable_pipe
905
929
 
906
 
    def _accept_bytes(self, bytes):
 
930
    def _accept_bytes(self, data):
907
931
        """See SmartClientStreamMedium.accept_bytes."""
908
932
        try:
909
 
            self._writeable_pipe.write(bytes)
910
 
        except IOError, e:
 
933
            self._writeable_pipe.write(data)
 
934
        except IOError as e:
911
935
            if e.errno in (errno.EINVAL, errno.EPIPE):
912
936
                raise errors.ConnectionReset(
913
937
                    "Error trying to write to subprocess", e)
914
938
            raise
915
 
        self._report_activity(len(bytes), 'write')
 
939
        self._report_activity(len(data), 'write')
916
940
 
917
941
    def _flush(self):
918
942
        """See SmartClientStreamMedium._flush()."""
924
948
    def _read_bytes(self, count):
925
949
        """See SmartClientStreamMedium._read_bytes."""
926
950
        bytes_to_read = min(count, _MAX_READ_SIZE)
927
 
        bytes = self._readable_pipe.read(bytes_to_read)
928
 
        self._report_activity(len(bytes), 'read')
929
 
        return bytes
 
951
        data = self._readable_pipe.read(bytes_to_read)
 
952
        self._report_activity(len(data), 'read')
 
953
        return data
930
954
 
931
955
 
932
956
class SSHParams(object):
933
957
    """A set of parameters for starting a remote bzr via SSH."""
934
958
 
935
959
    def __init__(self, host, port=None, username=None, password=None,
936
 
            bzr_remote_path='bzr'):
 
960
                 bzr_remote_path='bzr'):
937
961
        self.host = host
938
962
        self.port = port
939
963
        self.username = username
953
977
 
954
978
        :param ssh_params: A SSHParams instance.
955
979
        :param vendor: An optional override for the ssh vendor to use. See
956
 
            bzrlib.transport.ssh for details on ssh vendors.
 
980
            breezy.transport.ssh for details on ssh vendors.
957
981
        """
958
982
        self._real_medium = None
959
983
        self._ssh_params = ssh_params
1006
1030
        else:
1007
1031
            vendor = self._vendor
1008
1032
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1009
 
                self._ssh_params.password, self._ssh_params.host,
1010
 
                self._ssh_params.port,
1011
 
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1012
 
                         '--directory=/', '--allow-writes'])
 
1033
                                                  self._ssh_params.password, self._ssh_params.host,
 
1034
                                                  self._ssh_params.port,
 
1035
                                                  command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
1036
                                                           '--directory=/', '--allow-writes'])
1013
1037
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1014
1038
        if io_kind == 'socket':
1015
1039
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1104
1128
            port = int(self._port)
1105
1129
        try:
1106
1130
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
1107
 
                socket.SOCK_STREAM, 0, 0)
1108
 
        except socket.gaierror, (err_num, err_msg):
 
1131
                                           socket.SOCK_STREAM, 0, 0)
 
1132
        except socket.gaierror as xxx_todo_changeme:
 
1133
            (err_num, err_msg) = xxx_todo_changeme.args
1109
1134
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
1110
 
                    (self._host, port, err_msg))
 
1135
                                         (self._host, port, err_msg))
1111
1136
        # Initialize err in case there are no addresses returned:
1112
 
        err = socket.error("no address found for %s" % self._host)
 
1137
        last_err = socket.error("no address found for %s" % self._host)
1113
1138
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
1114
1139
            try:
1115
1140
                self._socket = socket.socket(family, socktype, proto)
1116
1141
                self._socket.setsockopt(socket.IPPROTO_TCP,
1117
1142
                                        socket.TCP_NODELAY, 1)
1118
1143
                self._socket.connect(sockaddr)
1119
 
            except socket.error, err:
 
1144
            except socket.error as err:
1120
1145
                if self._socket is not None:
1121
1146
                    self._socket.close()
1122
1147
                self._socket = None
 
1148
                last_err = err
1123
1149
                continue
1124
1150
            break
1125
1151
        if self._socket is None:
1126
1152
            # socket errors either have a (string) or (errno, string) as their
1127
1153
            # args.
1128
 
            if type(err.args) is str:
1129
 
                err_msg = err.args
 
1154
            if isinstance(last_err.args, str):
 
1155
                err_msg = last_err.args
1130
1156
            else:
1131
 
                err_msg = err.args[1]
 
1157
                err_msg = last_err.args[1]
1132
1158
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1133
 
                    (self._host, port, err_msg))
 
1159
                                         (self._host, port, err_msg))
1134
1160
        self._connected = True
1135
1161
        for hook in transport.Transport.hooks["post_connect"]:
1136
1162
            hook(self)
1138
1164
 
1139
1165
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1140
1166
    """A client medium for an already connected socket.
1141
 
    
 
1167
 
1142
1168
    Note that this class will assume it "owns" the socket, so it will close it
1143
1169
    when its disconnect method is called.
1144
1170
    """