/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/message.py

  • Committer: John Arbash Meinel
  • Date: 2009-06-02 21:11:18 UTC
  • mto: This revision was merged to the branch mainline in revision 4412.
  • Revision ID: john@arbash-meinel.com-20090602211118-fjsx4dxokahrqkrr
Change groupcompress.DeltaIndex to be lazy about indexing the first source.

This changes the performance characteristics of 'commit', especially of large files.
The main benefit is that during commit, we won't be doing any deltas as we add
all new content to a new group anyway.
Thus we know that we won't ever use the delta index we were creating, so
we can save both time and memory by never creating the index until it is
needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
import collections
 
18
from cStringIO import StringIO
 
19
 
 
20
from bzrlib import (
 
21
    debug,
 
22
    errors,
 
23
    )
 
24
from bzrlib.trace import mutter
 
25
 
 
26
 
 
27
class MessageHandler(object):
 
28
    """Base class for handling messages received via the smart protocol.
 
29
 
 
30
    As parts of a message are received, the corresponding PART_received method
 
31
    will be called.
 
32
    """
 
33
 
 
34
    def __init__(self):
 
35
        self.headers = None
 
36
 
 
37
    def headers_received(self, headers):
 
38
        """Called when message headers are received.
 
39
 
 
40
        This default implementation just stores them in self.headers.
 
41
        """
 
42
        self.headers = headers
 
43
 
 
44
    def byte_part_received(self, byte):
 
45
        """Called when a 'byte' part is received.
 
46
 
 
47
        Note that a 'byte' part is a message part consisting of exactly one
 
48
        byte.
 
49
        """
 
50
        raise NotImplementedError(self.byte_received)
 
51
 
 
52
    def bytes_part_received(self, bytes):
 
53
        """Called when a 'bytes' part is received.
 
54
 
 
55
        A 'bytes' message part can contain any number of bytes.  It should not
 
56
        be confused with a 'byte' part, which is always a single byte.
 
57
        """
 
58
        raise NotImplementedError(self.bytes_received)
 
59
 
 
60
    def structure_part_received(self, structure):
 
61
        """Called when a 'structure' part is received.
 
62
 
 
63
        :param structure: some structured data, which will be some combination
 
64
            of list, dict, int, and str objects.
 
65
        """
 
66
        raise NotImplementedError(self.bytes_received)
 
67
 
 
68
    def protocol_error(self, exception):
 
69
        """Called when there is a protocol decoding error.
 
70
 
 
71
        The default implementation just re-raises the exception.
 
72
        """
 
73
        raise
 
74
 
 
75
    def end_received(self):
 
76
        """Called when the end of the message is received."""
 
77
        # No-op by default.
 
78
        pass
 
79
 
 
80
 
 
81
class ConventionalRequestHandler(MessageHandler):
 
82
    """A message handler for "conventional" requests.
 
83
 
 
84
    "Conventional" is used in the sense described in
 
85
    doc/developers/network-protocol.txt: a simple message with arguments and an
 
86
    optional body.
 
87
 
 
88
    Possible states:
 
89
     * args: expecting args
 
90
     * body: expecting body (terminated by receiving a post-body status)
 
91
     * error: expecting post-body error
 
92
     * end: expecting end of message
 
93
     * nothing: finished
 
94
    """
 
95
 
 
96
    def __init__(self, request_handler, responder):
 
97
        MessageHandler.__init__(self)
 
98
        self.request_handler = request_handler
 
99
        self.responder = responder
 
100
        self.expecting = 'args'
 
101
        self._should_finish_body = False
 
102
        self._response_sent = False
 
103
 
 
104
    def protocol_error(self, exception):
 
105
        if self.responder.response_sent:
 
106
            # We can only send one response to a request, no matter how many
 
107
            # errors happen while processing it.
 
108
            return
 
109
        self.responder.send_error(exception)
 
110
 
 
111
    def byte_part_received(self, byte):
 
112
        if self.expecting == 'body':
 
113
            if byte == 'S':
 
114
                # Success.  Nothing more to come except the end of message.
 
115
                self.expecting = 'end'
 
116
            elif byte == 'E':
 
117
                # Error.  Expect an error structure.
 
118
                self.expecting = 'error'
 
119
            else:
 
120
                raise errors.SmartProtocolError(
 
121
                    'Non-success status byte in request body: %r' % (byte,))
 
122
        else:
 
123
            raise errors.SmartProtocolError(
 
124
                'Unexpected message part: byte(%r)' % (byte,))
 
125
 
 
126
    def structure_part_received(self, structure):
 
127
        if self.expecting == 'args':
 
128
            self._args_received(structure)
 
129
        elif self.expecting == 'error':
 
130
            self._error_received(structure)
 
131
        else:
 
132
            raise errors.SmartProtocolError(
 
133
                'Unexpected message part: structure(%r)' % (structure,))
 
134
 
 
135
    def _args_received(self, args):
 
136
        self.expecting = 'body'
 
137
        self.request_handler.dispatch_command(args[0], args[1:])
 
138
        if self.request_handler.finished_reading:
 
139
            self._response_sent = True
 
140
            self.responder.send_response(self.request_handler.response)
 
141
            self.expecting = 'end'
 
142
 
 
143
    def _error_received(self, error_args):
 
144
        self.expecting = 'end'
 
145
        self.request_handler.post_body_error_received(error_args)
 
146
 
 
147
    def bytes_part_received(self, bytes):
 
148
        if self.expecting == 'body':
 
149
            self._should_finish_body = True
 
150
            self.request_handler.accept_body(bytes)
 
151
        else:
 
152
            raise errors.SmartProtocolError(
 
153
                'Unexpected message part: bytes(%r)' % (bytes,))
 
154
 
 
155
    def end_received(self):
 
156
        if self.expecting not in ['body', 'end']:
 
157
            raise errors.SmartProtocolError(
 
158
                'End of message received prematurely (while expecting %s)'
 
159
                % (self.expecting,))
 
160
        self.expecting = 'nothing'
 
161
        self.request_handler.end_received()
 
162
        if not self.request_handler.finished_reading:
 
163
            raise errors.SmartProtocolError(
 
164
                "Complete conventional request was received, but request "
 
165
                "handler has not finished reading.")
 
166
        if not self._response_sent:
 
167
            self.responder.send_response(self.request_handler.response)
 
168
 
 
169
 
 
170
class ResponseHandler(object):
 
171
    """Abstract base class for an object that handles a smart response."""
 
172
 
 
173
    def read_response_tuple(self, expect_body=False):
 
174
        """Reads and returns the response tuple for the current request.
 
175
 
 
176
        :keyword expect_body: a boolean indicating if a body is expected in the
 
177
            response.  Some protocol versions needs this information to know
 
178
            when a response is finished.  If False, read_body_bytes should
 
179
            *not* be called afterwards.  Defaults to False.
 
180
        :returns: tuple of response arguments.
 
181
        """
 
182
        raise NotImplementedError(self.read_response_tuple)
 
183
 
 
184
    def read_body_bytes(self, count=-1):
 
185
        """Read and return some bytes from the body.
 
186
 
 
187
        :param count: if specified, read up to this many bytes.  By default,
 
188
            reads the entire body.
 
189
        :returns: str of bytes from the response body.
 
190
        """
 
191
        raise NotImplementedError(self.read_body_bytes)
 
192
 
 
193
    def read_streamed_body(self):
 
194
        """Returns an iterable that reads and returns a series of body chunks.
 
195
        """
 
196
        raise NotImplementedError(self.read_streamed_body)
 
197
 
 
198
    def cancel_read_body(self):
 
199
        """Stop expecting a body for this response.
 
200
 
 
201
        If expect_body was passed to read_response_tuple, this cancels that
 
202
        expectation (and thus finishes reading the response, allowing a new
 
203
        request to be issued).  This is useful if a response turns out to be an
 
204
        error rather than a normal result with a body.
 
205
        """
 
206
        raise NotImplementedError(self.cancel_read_body)
 
207
 
 
208
 
 
209
class ConventionalResponseHandler(MessageHandler, ResponseHandler):
 
210
 
 
211
    def __init__(self):
 
212
        MessageHandler.__init__(self)
 
213
        self.status = None
 
214
        self.args = None
 
215
        self._bytes_parts = collections.deque()
 
216
        self._body_started = False
 
217
        self._body_stream_status = None
 
218
        self._body = None
 
219
        self._body_error_args = None
 
220
        self.finished_reading = False
 
221
 
 
222
    def setProtoAndMediumRequest(self, protocol_decoder, medium_request):
 
223
        self._protocol_decoder = protocol_decoder
 
224
        self._medium_request = medium_request
 
225
 
 
226
    def byte_part_received(self, byte):
 
227
        if byte not in ['E', 'S']:
 
228
            raise errors.SmartProtocolError(
 
229
                'Unknown response status: %r' % (byte,))
 
230
        if self._body_started:
 
231
            if self._body_stream_status is not None:
 
232
                raise errors.SmartProtocolError(
 
233
                    'Unexpected byte part received: %r' % (byte,))
 
234
            self._body_stream_status = byte
 
235
        else:
 
236
            if self.status is not None:
 
237
                raise errors.SmartProtocolError(
 
238
                    'Unexpected byte part received: %r' % (byte,))
 
239
            self.status = byte
 
240
 
 
241
    def bytes_part_received(self, bytes):
 
242
        self._body_started = True
 
243
        self._bytes_parts.append(bytes)
 
244
 
 
245
    def structure_part_received(self, structure):
 
246
        if type(structure) is not tuple:
 
247
            raise errors.SmartProtocolError(
 
248
                'Args structure is not a sequence: %r' % (structure,))
 
249
        if not self._body_started:
 
250
            if self.args is not None:
 
251
                raise errors.SmartProtocolError(
 
252
                    'Unexpected structure received: %r (already got %r)'
 
253
                    % (structure, self.args))
 
254
            self.args = structure
 
255
        else:
 
256
            if self._body_stream_status != 'E':
 
257
                raise errors.SmartProtocolError(
 
258
                    'Unexpected structure received after body: %r'
 
259
                    % (structure,))
 
260
            self._body_error_args = structure
 
261
 
 
262
    def _wait_for_response_args(self):
 
263
        while self.args is None and not self.finished_reading:
 
264
            self._read_more()
 
265
 
 
266
    def _wait_for_response_end(self):
 
267
        while not self.finished_reading:
 
268
            self._read_more()
 
269
 
 
270
    def _read_more(self):
 
271
        next_read_size = self._protocol_decoder.next_read_size()
 
272
        if next_read_size == 0:
 
273
            # a complete request has been read.
 
274
            self.finished_reading = True
 
275
            self._medium_request.finished_reading()
 
276
            return
 
277
        bytes = self._medium_request.read_bytes(next_read_size)
 
278
        if bytes == '':
 
279
            # end of file encountered reading from server
 
280
            if 'hpss' in debug.debug_flags:
 
281
                mutter(
 
282
                    'decoder state: buf[:10]=%r, state_accept=%s',
 
283
                    self._protocol_decoder._get_in_buffer()[:10],
 
284
                    self._protocol_decoder.state_accept.__name__)
 
285
            raise errors.ConnectionReset(
 
286
                "please check connectivity and permissions")
 
287
        self._protocol_decoder.accept_bytes(bytes)
 
288
 
 
289
    def protocol_error(self, exception):
 
290
        # Whatever the error is, we're done with this request.
 
291
        self.finished_reading = True
 
292
        self._medium_request.finished_reading()
 
293
        raise
 
294
 
 
295
    def read_response_tuple(self, expect_body=False):
 
296
        """Read a response tuple from the wire."""
 
297
        self._wait_for_response_args()
 
298
        if not expect_body:
 
299
            self._wait_for_response_end()
 
300
        if 'hpss' in debug.debug_flags:
 
301
            mutter('   result:   %r', self.args)
 
302
        if self.status == 'E':
 
303
            self._wait_for_response_end()
 
304
            _translate_error(self.args)
 
305
        return tuple(self.args)
 
306
 
 
307
    def read_body_bytes(self, count=-1):
 
308
        """Read bytes from the body, decoding into a byte stream.
 
309
 
 
310
        We read all bytes at once to ensure we've checked the trailer for
 
311
        errors, and then feed the buffer back as read_body_bytes is called.
 
312
 
 
313
        Like the builtin file.read in Python, a count of -1 (the default) means
 
314
        read the entire body.
 
315
        """
 
316
        # TODO: we don't necessarily need to buffer the full request if count
 
317
        # != -1.  (2008/04/30, Andrew Bennetts)
 
318
        if self._body is None:
 
319
            self._wait_for_response_end()
 
320
            body_bytes = ''.join(self._bytes_parts)
 
321
            if 'hpss' in debug.debug_flags:
 
322
                mutter('              %d body bytes read', len(body_bytes))
 
323
            self._body = StringIO(body_bytes)
 
324
            self._bytes_parts = None
 
325
        return self._body.read(count)
 
326
 
 
327
    def read_streamed_body(self):
 
328
        while not self.finished_reading:
 
329
            while self._bytes_parts:
 
330
                bytes_part = self._bytes_parts.popleft()
 
331
                if 'hpss' in debug.debug_flags:
 
332
                    mutter('              %d byte part read', len(bytes_part))
 
333
                yield bytes_part
 
334
            self._read_more()
 
335
        if self._body_stream_status == 'E':
 
336
            _translate_error(self._body_error_args)
 
337
 
 
338
    def cancel_read_body(self):
 
339
        self._wait_for_response_end()
 
340
 
 
341
 
 
342
def _translate_error(error_tuple):
 
343
    # Many exceptions need some state from the requestor to be properly
 
344
    # translated (e.g. they need a branch object).  So this only translates a
 
345
    # few errors, and the rest are turned into a generic ErrorFromSmartServer.
 
346
    error_name = error_tuple[0]
 
347
    error_args = error_tuple[1:]
 
348
    if error_name == 'UnknownMethod':
 
349
        raise errors.UnknownSmartMethod(error_args[0])
 
350
    if error_name == 'LockContention':
 
351
        raise errors.LockContention('(remote lock)')
 
352
    elif error_name == 'LockFailed':
 
353
        raise errors.LockFailed(*error_args[:2])
 
354
    elif error_name == 'FileExists':
 
355
        raise errors.FileExists(error_args[0])
 
356
    elif error_name == 'NoSuchFile':
 
357
        raise errors.NoSuchFile(error_args[0])
 
358
    else:
 
359
        raise errors.ErrorFromSmartServer(error_tuple)