/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/smart/client.py

  • Committer: Jelmer Vernooij
  • Date: 2019-06-03 21:25:01 UTC
  • mto: This revision was merged to the branch mainline in revision 7318.
  • Revision ID: jelmer@jelmer.uk-20190603212501-zgt2czrlc6oqoi7a
Fix tests on python 2.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
import bzrlib
18
 
from bzrlib.smart import message, protocol
19
 
from bzrlib.trace import warning
20
 
from bzrlib import (
 
17
from __future__ import absolute_import
 
18
 
 
19
from ... import lazy_import
 
20
lazy_import.lazy_import(globals(), """
 
21
from breezy.bzr.smart import request as _mod_request
 
22
""")
 
23
 
 
24
import breezy
 
25
from . import message, protocol
 
26
from ... import (
 
27
    debug,
21
28
    errors,
22
29
    hooks,
 
30
    trace,
23
31
    )
24
32
 
25
33
 
32
40
        """
33
41
        self._medium = medium
34
42
        if headers is None:
35
 
            self._headers = {'Software version': bzrlib.__version__}
 
43
            self._headers = {
 
44
                b'Software version': breezy.__version__.encode('utf-8')}
36
45
        else:
37
46
            self._headers = dict(headers)
38
47
 
39
48
    def __repr__(self):
40
49
        return '%s(%r)' % (self.__class__.__name__, self._medium)
41
50
 
42
 
    def _send_request(self, protocol_version, method, args, body=None,
43
 
                      readv_body=None, body_stream=None):
44
 
        encoder, response_handler = self._construct_protocol(
45
 
            protocol_version)
46
 
        encoder.set_headers(self._headers)
47
 
        if body is not None:
48
 
            if readv_body is not None:
49
 
                raise AssertionError(
50
 
                    "body and readv_body are mutually exclusive.")
51
 
            if body_stream is not None:
52
 
                raise AssertionError(
53
 
                    "body and body_stream are mutually exclusive.")
54
 
            encoder.call_with_body_bytes((method, ) + args, body)
55
 
        elif readv_body is not None:
56
 
            if body_stream is not None:
57
 
                raise AssertionError(
58
 
                    "readv_body and body_stream are mutually exclusive.")
59
 
            encoder.call_with_body_readv_array((method, ) + args, readv_body)
60
 
        elif body_stream is not None:
61
 
            encoder.call_with_body_stream((method, ) + args, body_stream)
62
 
        else:
63
 
            encoder.call(method, *args)
64
 
        return response_handler
65
 
 
66
 
    def _run_call_hooks(self, method, args, body, readv_body):
67
 
        if not _SmartClient.hooks['call']:
68
 
            return
69
 
        params = CallHookParams(method, args, body, readv_body, self._medium)
70
 
        for hook in _SmartClient.hooks['call']:
71
 
            hook(params)
72
 
 
73
51
    def _call_and_read_response(self, method, args, body=None, readv_body=None,
74
 
            body_stream=None, expect_response_body=True):
75
 
        self._run_call_hooks(method, args, body, readv_body)
76
 
        if self._medium._protocol_version is not None:
77
 
            response_handler = self._send_request(
78
 
                self._medium._protocol_version, method, args, body=body,
79
 
                readv_body=readv_body, body_stream=body_stream)
80
 
            return (response_handler.read_response_tuple(
81
 
                        expect_body=expect_response_body),
82
 
                    response_handler)
83
 
        else:
84
 
            for protocol_version in [3, 2]:
85
 
                if protocol_version == 2:
86
 
                    # If v3 doesn't work, the remote side is older than 1.6.
87
 
                    self._medium._remember_remote_is_before((1, 6))
88
 
                response_handler = self._send_request(
89
 
                    protocol_version, method, args, body=body,
90
 
                    readv_body=readv_body, body_stream=body_stream)
91
 
                try:
92
 
                    response_tuple = response_handler.read_response_tuple(
93
 
                        expect_body=expect_response_body)
94
 
                except errors.UnexpectedProtocolVersionMarker, err:
95
 
                    # TODO: We could recover from this without disconnecting if
96
 
                    # we recognise the protocol version.
97
 
                    warning(
98
 
                        'Server does not understand Bazaar network protocol %d,'
99
 
                        ' reconnecting.  (Upgrade the server to avoid this.)'
100
 
                        % (protocol_version,))
101
 
                    self._medium.disconnect()
102
 
                    continue
103
 
                except errors.ErrorFromSmartServer:
104
 
                    # If we received an error reply from the server, then it
105
 
                    # must be ok with this protocol version.
106
 
                    self._medium._protocol_version = protocol_version
107
 
                    raise
108
 
                else:
109
 
                    self._medium._protocol_version = protocol_version
110
 
                    return response_tuple, response_handler
111
 
            raise errors.SmartProtocolError(
112
 
                'Server is not a Bazaar server: ' + str(err))
113
 
 
114
 
    def _construct_protocol(self, version):
115
 
        request = self._medium.get_request()
116
 
        if version == 3:
117
 
            request_encoder = protocol.ProtocolThreeRequester(request)
118
 
            response_handler = message.ConventionalResponseHandler()
119
 
            response_proto = protocol.ProtocolThreeDecoder(
120
 
                response_handler, expect_version_marker=True)
121
 
            response_handler.setProtoAndMediumRequest(response_proto, request)
122
 
        elif version == 2:
123
 
            request_encoder = protocol.SmartClientRequestProtocolTwo(request)
124
 
            response_handler = request_encoder
125
 
        else:
126
 
            request_encoder = protocol.SmartClientRequestProtocolOne(request)
127
 
            response_handler = request_encoder
128
 
        return request_encoder, response_handler
 
52
                                body_stream=None, expect_response_body=True):
 
53
        request = _SmartClientRequest(self, method, args, body=body,
 
54
                                      readv_body=readv_body, body_stream=body_stream,
 
55
                                      expect_response_body=expect_response_body)
 
56
        return request.call_and_read_response()
129
57
 
130
58
    def call(self, method, *args):
131
59
        """Call a method on the remote server."""
146
74
 
147
75
    def call_with_body_bytes(self, method, args, body):
148
76
        """Call a method on the remote server with body bytes."""
149
 
        if type(method) is not str:
 
77
        if not isinstance(method, bytes):
150
78
            raise TypeError('method must be a byte string, not %r' % (method,))
151
79
        for arg in args:
152
 
            if type(arg) is not str:
 
80
            if not isinstance(arg, bytes):
153
81
                raise TypeError('args must be byte strings, not %r' % (args,))
154
 
        if type(body) is not str:
 
82
        if not isinstance(body, bytes):
155
83
            raise TypeError('body must be byte string, not %r' % (body,))
156
84
        response, response_handler = self._call_and_read_response(
157
85
            method, args, body=body, expect_response_body=False)
159
87
 
160
88
    def call_with_body_bytes_expecting_body(self, method, args, body):
161
89
        """Call a method on the remote server with body bytes."""
162
 
        if type(method) is not str:
 
90
        if not isinstance(method, bytes):
163
91
            raise TypeError('method must be a byte string, not %r' % (method,))
164
92
        for arg in args:
165
 
            if type(arg) is not str:
 
93
            if not isinstance(arg, bytes):
166
94
                raise TypeError('args must be byte strings, not %r' % (args,))
167
 
        if type(body) is not str:
 
95
        if not isinstance(body, bytes):
168
96
            raise TypeError('body must be byte string, not %r' % (body,))
169
97
        response, response_handler = self._call_and_read_response(
170
98
            method, args, body=body, expect_response_body=True)
172
100
 
173
101
    def call_with_body_readv_array(self, args, body):
174
102
        response, response_handler = self._call_and_read_response(
175
 
                args[0], args[1:], readv_body=body, expect_response_body=True)
 
103
            args[0], args[1:], readv_body=body, expect_response_body=True)
176
104
        return (response, response_handler)
177
105
 
178
106
    def call_with_body_stream(self, args, stream):
179
107
        response, response_handler = self._call_and_read_response(
180
 
                args[0], args[1:], body_stream=stream,
181
 
                expect_response_body=False)
 
108
            args[0], args[1:], body_stream=stream,
 
109
            expect_response_body=False)
182
110
        return (response, response_handler)
183
111
 
184
112
    def remote_path_from_transport(self, transport):
188
116
        anything but path, so it is only safe to use it in requests sent over
189
117
        the medium from the matching transport.
190
118
        """
191
 
        return self._medium.remote_path_from_transport(transport)
 
119
        return self._medium.remote_path_from_transport(transport).encode('utf-8')
 
120
 
 
121
 
 
122
class _SmartClientRequest(object):
 
123
    """Encapsulate the logic for a single request.
 
124
 
 
125
    This class handles things like reconnecting and sending the request a
 
126
    second time when the connection is reset in the middle. It also handles the
 
127
    multiple requests that get made if we don't know what protocol the server
 
128
    supports yet.
 
129
 
 
130
    Generally, you build up one of these objects, passing in the arguments that
 
131
    you want to send to the server, and then use 'call_and_read_response' to
 
132
    get the response from the server.
 
133
    """
 
134
 
 
135
    def __init__(self, client, method, args, body=None, readv_body=None,
 
136
                 body_stream=None, expect_response_body=True):
 
137
        self.client = client
 
138
        self.method = method
 
139
        self.args = args
 
140
        self.body = body
 
141
        self.readv_body = readv_body
 
142
        self.body_stream = body_stream
 
143
        self.expect_response_body = expect_response_body
 
144
 
 
145
    def call_and_read_response(self):
 
146
        """Send the request to the server, and read the initial response.
 
147
 
 
148
        This doesn't read all of the body content of the response, instead it
 
149
        returns (response_tuple, response_handler). response_tuple is the 'ok',
 
150
        or 'error' information, and 'response_handler' can be used to get the
 
151
        content stream out.
 
152
        """
 
153
        self._run_call_hooks()
 
154
        protocol_version = self.client._medium._protocol_version
 
155
        if protocol_version is None:
 
156
            return self._call_determining_protocol_version()
 
157
        else:
 
158
            return self._call(protocol_version)
 
159
 
 
160
    def _is_safe_to_send_twice(self):
 
161
        """Check if the current method is re-entrant safe."""
 
162
        if self.body_stream is not None or 'noretry' in debug.debug_flags:
 
163
            # We can't restart a body stream that has already been consumed.
 
164
            return False
 
165
        request_type = _mod_request.request_handlers.get_info(self.method)
 
166
        if request_type in ('read', 'idem', 'semi'):
 
167
            return True
 
168
        # If we have gotten this far, 'stream' cannot be retried, because we
 
169
        # already consumed the local stream.
 
170
        if request_type in ('semivfs', 'mutate', 'stream'):
 
171
            return False
 
172
        trace.mutter('Unknown request type: %s for method %s'
 
173
                     % (request_type, self.method))
 
174
        return False
 
175
 
 
176
    def _run_call_hooks(self):
 
177
        if not _SmartClient.hooks['call']:
 
178
            return
 
179
        params = CallHookParams(self.method, self.args, self.body,
 
180
                                self.readv_body, self.client._medium)
 
181
        for hook in _SmartClient.hooks['call']:
 
182
            hook(params)
 
183
 
 
184
    def _call(self, protocol_version):
 
185
        """We know the protocol version.
 
186
 
 
187
        So this just sends the request, and then reads the response. This is
 
188
        where the code will be to retry requests if the connection is closed.
 
189
        """
 
190
        response_handler = self._send(protocol_version)
 
191
        try:
 
192
            response_tuple = response_handler.read_response_tuple(
 
193
                expect_body=self.expect_response_body)
 
194
        except errors.ConnectionReset as e:
 
195
            self.client._medium.reset()
 
196
            if not self._is_safe_to_send_twice():
 
197
                raise
 
198
            trace.warning('ConnectionReset reading response for %r, retrying'
 
199
                          % (self.method,))
 
200
            trace.log_exception_quietly()
 
201
            encoder, response_handler = self._construct_protocol(
 
202
                protocol_version)
 
203
            self._send_no_retry(encoder)
 
204
            response_tuple = response_handler.read_response_tuple(
 
205
                expect_body=self.expect_response_body)
 
206
        return (response_tuple, response_handler)
 
207
 
 
208
    def _call_determining_protocol_version(self):
 
209
        """Determine what protocol the remote server supports.
 
210
 
 
211
        We do this by placing a request in the most recent protocol, and
 
212
        handling the UnexpectedProtocolVersionMarker from the server.
 
213
        """
 
214
        last_err = None
 
215
        for protocol_version in [3, 2]:
 
216
            if protocol_version == 2:
 
217
                # If v3 doesn't work, the remote side is older than 1.6.
 
218
                self.client._medium._remember_remote_is_before((1, 6))
 
219
            try:
 
220
                response_tuple, response_handler = self._call(protocol_version)
 
221
            except errors.UnexpectedProtocolVersionMarker as err:
 
222
                # TODO: We could recover from this without disconnecting if
 
223
                # we recognise the protocol version.
 
224
                trace.warning(
 
225
                    'Server does not understand Bazaar network protocol %d,'
 
226
                    ' reconnecting.  (Upgrade the server to avoid this.)'
 
227
                    % (protocol_version,))
 
228
                self.client._medium.disconnect()
 
229
                last_err = err
 
230
                continue
 
231
            except errors.ErrorFromSmartServer:
 
232
                # If we received an error reply from the server, then it
 
233
                # must be ok with this protocol version.
 
234
                self.client._medium._protocol_version = protocol_version
 
235
                raise
 
236
            else:
 
237
                self.client._medium._protocol_version = protocol_version
 
238
                return response_tuple, response_handler
 
239
        raise errors.SmartProtocolError(
 
240
            'Server is not a Bazaar server: ' + str(last_err))
 
241
 
 
242
    def _construct_protocol(self, version):
 
243
        """Build the encoding stack for a given protocol version."""
 
244
        request = self.client._medium.get_request()
 
245
        if version == 3:
 
246
            request_encoder = protocol.ProtocolThreeRequester(request)
 
247
            response_handler = message.ConventionalResponseHandler()
 
248
            response_proto = protocol.ProtocolThreeDecoder(
 
249
                response_handler, expect_version_marker=True)
 
250
            response_handler.setProtoAndMediumRequest(response_proto, request)
 
251
        elif version == 2:
 
252
            request_encoder = protocol.SmartClientRequestProtocolTwo(request)
 
253
            response_handler = request_encoder
 
254
        else:
 
255
            request_encoder = protocol.SmartClientRequestProtocolOne(request)
 
256
            response_handler = request_encoder
 
257
        return request_encoder, response_handler
 
258
 
 
259
    def _send(self, protocol_version):
 
260
        """Encode the request, and send it to the server.
 
261
 
 
262
        This will retry a request if we get a ConnectionReset while sending the
 
263
        request to the server. (Unless we have a body_stream that we have
 
264
        already started consuming, since we can't restart body_streams)
 
265
 
 
266
        :return: response_handler as defined by _construct_protocol
 
267
        """
 
268
        encoder, response_handler = self._construct_protocol(protocol_version)
 
269
        try:
 
270
            self._send_no_retry(encoder)
 
271
        except errors.ConnectionReset as e:
 
272
            # If we fail during the _send_no_retry phase, then we can
 
273
            # be confident that the server did not get our request, because we
 
274
            # haven't started waiting for the reply yet. So try the request
 
275
            # again. We only issue a single retry, because if the connection
 
276
            # really is down, there is no reason to loop endlessly.
 
277
 
 
278
            # Connection is dead, so close our end of it.
 
279
            self.client._medium.reset()
 
280
            if (('noretry' in debug.debug_flags) or
 
281
                (self.body_stream is not None and
 
282
                    encoder.body_stream_started)):
 
283
                # We can't restart a body_stream that has been partially
 
284
                # consumed, so we don't retry.
 
285
                # Note: We don't have to worry about
 
286
                #   SmartClientRequestProtocolOne or Two, because they don't
 
287
                #   support client-side body streams.
 
288
                raise
 
289
            trace.warning('ConnectionReset calling %r, retrying'
 
290
                          % (self.method,))
 
291
            trace.log_exception_quietly()
 
292
            encoder, response_handler = self._construct_protocol(
 
293
                protocol_version)
 
294
            self._send_no_retry(encoder)
 
295
        return response_handler
 
296
 
 
297
    def _send_no_retry(self, encoder):
 
298
        """Just encode the request and try to send it."""
 
299
        encoder.set_headers(self.client._headers)
 
300
        if self.body is not None:
 
301
            if self.readv_body is not None:
 
302
                raise AssertionError(
 
303
                    "body and readv_body are mutually exclusive.")
 
304
            if self.body_stream is not None:
 
305
                raise AssertionError(
 
306
                    "body and body_stream are mutually exclusive.")
 
307
            encoder.call_with_body_bytes(
 
308
                (self.method, ) + self.args, self.body)
 
309
        elif self.readv_body is not None:
 
310
            if self.body_stream is not None:
 
311
                raise AssertionError(
 
312
                    "readv_body and body_stream are mutually exclusive.")
 
313
            encoder.call_with_body_readv_array((self.method, ) + self.args,
 
314
                                               self.readv_body)
 
315
        elif self.body_stream is not None:
 
316
            encoder.call_with_body_stream((self.method, ) + self.args,
 
317
                                          self.body_stream)
 
318
        else:
 
319
            encoder.call(self.method, *self.args)
192
320
 
193
321
 
194
322
class SmartClientHooks(hooks.Hooks):
195
323
 
196
324
    def __init__(self):
197
 
        hooks.Hooks.__init__(self)
198
 
        self.create_hook(hooks.HookPoint('call',
199
 
            "Called when the smart client is submitting a request to the "
200
 
            "smart server. Called with a bzrlib.smart.client.CallHookParams "
201
 
            "object. Streaming request bodies, and responses, are not "
202
 
            "accessible.", None, None))
 
325
        hooks.Hooks.__init__(
 
326
            self, "breezy.bzr.smart.client", "_SmartClient.hooks")
 
327
        self.add_hook('call',
 
328
                      "Called when the smart client is submitting a request to the "
 
329
                      "smart server. Called with a breezy.bzr.smart.client.CallHookParams "
 
330
                      "object. Streaming request bodies, and responses, are not "
 
331
                      "accessible.", None)
203
332
 
204
333
 
205
334
_SmartClient.hooks = SmartClientHooks()
215
344
        self.medium = medium
216
345
 
217
346
    def __repr__(self):
218
 
        attrs = dict((k, v) for (k, v) in self.__dict__.iteritems()
 
347
        attrs = dict((k, v) for k, v in self.__dict__.items()
219
348
                     if v is not None)
220
349
        return '<%s %r>' % (self.__class__.__name__, attrs)
221
350
 
222
351
    def __eq__(self, other):
223
 
        if type(other) is not type(self):
 
352
        if not isinstance(other, type(self)):
224
353
            return NotImplemented
225
354
        return self.__dict__ == other.__dict__
226
355