/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
1
# Copyright (C) 8 Canonical Ltd
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
3195.3.24 by Andrew Bennetts
Implement read_streamed_body on ConventionalResponseHandler.
17
import collections
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
18
from cStringIO import StringIO
19
20
from bzrlib import errors
21
22
class MessageHandler(object):
23
24
    def __init__(self):
25
        self.headers = None
26
27
    def headers_received(self, headers):
28
        self.headers = headers
29
30
    def byte_part_received(self, byte):
31
        raise NotImplementedError(self.byte_received)
32
33
    def bytes_part_received(self, bytes):
34
        raise NotImplementedError(self.bytes_received)
35
36
    def structure_part_received(self, structure):
37
        raise NotImplementedError(self.bytes_received)
38
39
    def protocol_error(self, exception):
40
        """Called when there is a protocol decoding error."""
41
        raise
42
    
43
    def end_received(self):
44
        # XXX
45
        pass
46
47
48
class ConventionalRequestHandler(MessageHandler):
49
50
    def __init__(self, request_handler, responder):
51
        MessageHandler.__init__(self)
52
        self.request_handler = request_handler
53
        self.responder = responder
54
        self.args_received = False
55
#        self.args = None
56
#        self.error = None
57
#        self.prefixed_body = None
58
#        self.body_stream = None
59
60
    def protocol_error(self, exception):
61
        self.responder.send_error(exception)
62
63
    def byte_part_received(self, byte):
64
        raise errors.SmartProtocolError(
65
            'Unexpected message part: byte(%r)' % (byte,))
66
67
    def structure_part_received(self, structure):
68
        if self.args_received:
69
            raise errors.SmartProtocolError(
70
                'Unexpected message part: structure(%r)' % (structure,))
71
        self.args_received = True
72
        self.request_handler.dispatch_command(structure[0], structure[1:])
73
        if self.request_handler.finished_reading:
74
            self.responder.send_response(self.request_handler.response)
75
76
    def bytes_part_received(self, bytes):
77
        # XXX: this API requires monolithic bodies to be buffered
78
        # XXX: how to distinguish between a monolithic body and a chunk stream?
79
        #      Hmm, I guess the request handler knows which it is expecting
80
        #      (once the args have been received), so it should just deal?  We
81
        #      don't yet have requests that expect a stream anyway.
82
        #      *Maybe* a one-byte 'c' or 'm' (chunk or monolithic) flag before
83
        #      first bytes part?
3195.3.18 by Andrew Bennetts
call_with_body_bytes now works with v3 (e.g. test_copy_content_remote_to_local passes). Lots of debugging cruft, though.
84
        self.request_handler.accept_body(bytes)
85
        self.request_handler.end_of_body()
86
        assert self.request_handler.finished_reading
87
        self.responder.send_response(self.request_handler.response)
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
88
89
    def end_received(self):
90
        # XXX
91
        pass
92
93
94
class ConventionalResponseHandler(MessageHandler):
95
96
    def __init__(self):
97
        MessageHandler.__init__(self)
98
        self.status = None
99
        self.args = None
3195.3.24 by Andrew Bennetts
Implement read_streamed_body on ConventionalResponseHandler.
100
        self._bytes_parts = collections.deque()
3245.4.5 by Andrew Bennetts
Implement interrupting body streams with an error.
101
        self._body_started = False
102
        self._body_stream_status = None
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
103
        self._body = None
3245.4.5 by Andrew Bennetts
Implement interrupting body streams with an error.
104
        self._body_error_args = None
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
105
        self.finished_reading = False
106
107
    def setProtoAndMedium(self, protocol_decoder, medium):
108
        self._protocol_decoder = protocol_decoder
109
        self._medium = medium
110
111
    def byte_part_received(self, byte):
112
        if byte not in ['E', 'S']:
113
            raise errors.SmartProtocolError(
114
                'Unknown response status: %r' % (byte,))
3245.4.5 by Andrew Bennetts
Implement interrupting body streams with an error.
115
        if self._body_started:
116
            if self._body_stream_status is not None:
117
                raise errors.SmartProtocolError(
118
                    'Unexpected byte part received: %r' % (byte,))
119
            self._body_stream_status = byte
120
        else:
121
            if self.status is not None:
122
                raise errors.SmartProtocolError(
123
                    'Unexpected byte part received: %r' % (byte,))
124
            self.status = byte
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
125
126
    def bytes_part_received(self, bytes):
3245.4.5 by Andrew Bennetts
Implement interrupting body streams with an error.
127
        self._body_started = True
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
128
        self._bytes_parts.append(bytes)
129
130
    def structure_part_received(self, structure):
3245.4.5 by Andrew Bennetts
Implement interrupting body streams with an error.
131
        if type(structure) is not list:
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
132
            raise errors.SmartProtocolError(
3245.4.5 by Andrew Bennetts
Implement interrupting body streams with an error.
133
                'Args structure is not a sequence: %r' % (structure,))
134
        structure = tuple(structure)
135
        if not self._body_started:
136
            if self.args is not None:
137
                raise errors.SmartProtocolError(
138
                    'Unexpected structure received: %r (already got %r)'
139
                    % (structure, self.args))
140
            self.args = structure
141
        else:
142
            if self._body_stream_status != 'E':
143
                raise errors.SmartProtocolError(
144
                    'Unexpected structure received after body: %r'
145
                    % (structure,))
146
            self._body_error_args = structure
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
147
148
    def _wait_for_response_args(self):
149
        while self.args is None and not self.finished_reading:
150
            self._read_more()
151
152
    def _wait_for_response_end(self):
153
        while not self.finished_reading:
154
            self._read_more()
155
156
    def _read_more(self):
157
        next_read_size = self._protocol_decoder.next_read_size()
158
        if next_read_size == 0:
159
            # a complete request has been read.
160
            self.finished_reading = True
161
            self._medium.finished_reading()
162
            return
163
        bytes = self._medium.read_bytes(next_read_size)
164
        if bytes == '':
165
            # end of file encountered reading from server
166
            raise errors.ConnectionReset(
167
                "please check connectivity and permissions",
168
                "(and try -Dhpss if further diagnosis is required)")
169
        self._protocol_decoder.accept_bytes(bytes)
170
171
    def read_response_tuple(self, expect_body=False):
172
        """Read a response tuple from the wire.
173
174
        The expect_body flag is ignored.
175
        """
176
        self._wait_for_response_args()
3195.3.18 by Andrew Bennetts
call_with_body_bytes now works with v3 (e.g. test_copy_content_remote_to_local passes). Lots of debugging cruft, though.
177
        if not expect_body:
178
            self._wait_for_response_end()
3195.3.23 by Andrew Bennetts
Improve the error handling, fixing more tests.
179
        #if self.status == 'E':
180
        #    xxx_translate_error() # XXX
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
181
        return tuple(self.args)
182
183
    def read_body_bytes(self, count=-1):
184
        """Read bytes from the body, decoding into a byte stream.
185
        
186
        We read all bytes at once to ensure we've checked the trailer for 
187
        errors, and then feed the buffer back as read_body_bytes is called.
188
        """
189
        # XXX: don't buffer the full request
190
        if self._body is None:
3195.3.18 by Andrew Bennetts
call_with_body_bytes now works with v3 (e.g. test_copy_content_remote_to_local passes). Lots of debugging cruft, though.
191
            self._wait_for_response_end()
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
192
            self._body = StringIO(''.join(self._bytes_parts))
193
            self._bytes_parts = None
194
        return self._body.read(count)
195
3195.3.24 by Andrew Bennetts
Implement read_streamed_body on ConventionalResponseHandler.
196
    def read_streamed_body(self):
197
        # XXX: this doesn't implement error handling for interrupted streams.
198
        while not self.finished_reading:
199
            while self._bytes_parts:
200
                yield self._bytes_parts.popleft()
201
            self._read_more()
3245.4.5 by Andrew Bennetts
Implement interrupting body streams with an error.
202
        if self._body_stream_status == 'E':
203
            _translate_error(self._body_error_args)
3195.3.24 by Andrew Bennetts
Implement read_streamed_body on ConventionalResponseHandler.
204
3195.3.17 by Andrew Bennetts
Some tests now passing using protocol 3.
205
    def cancel_read_body(self):
206
        self._wait_for_response_end()
3245.4.5 by Andrew Bennetts
Implement interrupting body streams with an error.
207
208
209
def _translate_error(error_tuple):
210
    # XXX: Hmm!  Need state from the request.  Hmm.
211
    error_name = error_tuple[0]
212
    error_args = error_tuple[1:]
213
    if error_name == 'LockContention':
214
        raise errors.LockContention('(remote lock)')
215
    elif error_name == 'LockFailed':
216
        raise errors.LockContention(*error_args[:2])
217
    else:
218
        raise errors.ErrorFromSmartServer(error_tuple)
219