21
from __future__ import absolute_import
24
from collections.abc import deque
25
except ImportError: # python < 3.7
26
from collections import deque
22
from cStringIO import StringIO
33
import thread as _thread
42
from ...sixish import (
46
from . import message, request
47
from ...sixish import text_type
48
from ...trace import log_exception_quietly, mutter
49
from ...bencode import bdecode_as_tuple, bencode
35
from bzrlib.smart import message, request
36
from bzrlib.trace import log_exception_quietly, mutter
37
from bzrlib.bencode import bdecode_as_tuple, bencode
52
40
# Protocol version strings. These are sent as prefixes of bzr requests and
53
41
# responses to identify the protocol version being used. (There are no version
54
42
# one strings because that version doesn't send any).
55
REQUEST_VERSION_TWO = b'bzr request 2\n'
56
RESPONSE_VERSION_TWO = b'bzr response 2\n'
43
REQUEST_VERSION_TWO = 'bzr request 2\n'
44
RESPONSE_VERSION_TWO = 'bzr response 2\n'
58
MESSAGE_VERSION_THREE = b'bzr message 3 (bzr 1.6)\n'
46
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.6)\n'
59
47
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
67
55
def _decode_tuple(req_line):
68
if req_line is None or req_line == b'':
56
if req_line is None or req_line == '':
70
if not req_line.endswith(b'\n'):
58
if req_line[-1] != '\n':
71
59
raise errors.SmartProtocolError("request %r not terminated" % req_line)
72
return tuple(req_line[:-1].split(b'\x01'))
60
return tuple(req_line[:-1].split('\x01'))
75
63
def _encode_tuple(args):
76
64
"""Encode the tuple args to a bytestream."""
78
if isinstance(arg, text_type):
80
return b'\x01'.join(args) + b'\n'
65
joined = '\x01'.join(args) + '\n'
66
if type(joined) is unicode:
67
# XXX: We should fix things so this never happens! -AJB, 20100304
68
mutter('response args contain unicode, should be only bytes: %r',
70
joined = joined.encode('ascii')
83
74
class Requester(object):
121
112
# support multiple chunks?
122
113
def _encode_bulk_data(self, body):
123
114
"""Encode body as a bulk data chunk."""
124
return b''.join((b'%d\n' % len(body), body, b'done\n'))
115
return ''.join(('%d\n' % len(body), body, 'done\n'))
126
117
def _serialise_offsets(self, offsets):
127
118
"""Serialise a readv offset list."""
129
120
for start, length in offsets:
130
txt.append(b'%d,%d' % (start, length))
131
return b'\n'.join(txt)
121
txt.append('%d,%d' % (start, length))
122
return '\n'.join(txt)
134
125
class SmartServerRequestProtocolOne(SmartProtocolBase):
135
126
"""Server-side encoding and decoding logic for smart version 1."""
137
128
def __init__(self, backing_transport, write_func, root_client_path='/',
139
130
self._backing_transport = backing_transport
140
131
self._root_client_path = root_client_path
141
132
self._jail_root = jail_root
142
self.unused_data = b''
133
self.unused_data = ''
143
134
self._finished = False
145
136
self._has_dispatched = False
146
137
self.request = None
147
138
self._body_decoder = None
148
139
self._write_func = write_func
150
def accept_bytes(self, data):
141
def accept_bytes(self, bytes):
151
142
"""Take bytes, and advance the internal state machine appropriately.
153
:param data: must be a byte string
144
:param bytes: must be a byte string
155
if not isinstance(data, bytes):
156
raise ValueError(data)
157
self.in_buffer += data
146
if not isinstance(bytes, str):
147
raise ValueError(bytes)
148
self.in_buffer += bytes
158
149
if not self._has_dispatched:
159
if b'\n' not in self.in_buffer:
150
if '\n' not in self.in_buffer:
160
151
# no command line yet
162
153
self._has_dispatched = True
164
first_line, self.in_buffer = self.in_buffer.split(b'\n', 1)
155
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
166
157
req_args = _decode_tuple(first_line)
167
158
self.request = request.SmartServerRequestHandler(
168
159
self._backing_transport, commands=request.request_handlers,
172
163
if self.request.finished_reading:
173
164
# trivial request
174
165
self.unused_data = self.in_buffer
176
167
self._send_response(self.request.response)
177
168
except KeyboardInterrupt:
179
except errors.UnknownSmartMethod as err:
170
except errors.UnknownSmartMethod, err:
180
171
protocol_error = errors.SmartProtocolError(
181
"bad request '%s'" % (err.verb.decode('ascii'),))
172
"bad request %r" % (err.verb,))
182
173
failure = request.FailedSmartServerResponse(
183
(b'error', str(protocol_error).encode('utf-8')))
174
('error', str(protocol_error)))
184
175
self._send_response(failure)
186
except Exception as exception:
177
except Exception, exception:
187
178
# everything else: pass to client, flush, and quit
188
179
log_exception_quietly()
189
180
self._send_response(request.FailedSmartServerResponse(
190
(b'error', str(exception).encode('utf-8'))))
181
('error', str(exception))))
193
184
if self._has_dispatched:
227
218
self._write_success_or_failure_prefix(response)
228
219
self._write_func(_encode_tuple(args))
229
220
if body is not None:
230
if not isinstance(body, bytes):
221
if not isinstance(body, str):
231
222
raise ValueError(body)
232
data = self._encode_bulk_data(body)
233
self._write_func(data)
223
bytes = self._encode_bulk_data(body)
224
self._write_func(bytes)
235
226
def _write_protocol_version(self):
236
227
"""Write any prefixes this protocol requires.
267
258
def _write_success_or_failure_prefix(self, response):
268
259
"""Write the protocol specific success/failure prefix."""
269
260
if response.is_successful():
270
self._write_func(b'success\n')
261
self._write_func('success\n')
272
self._write_func(b'failed\n')
263
self._write_func('failed\n')
274
265
def _write_protocol_version(self):
275
266
r"""Write any prefixes this protocol requires.
287
278
self._write_success_or_failure_prefix(response)
288
279
self._write_func(_encode_tuple(response.args))
289
280
if response.body is not None:
290
if not isinstance(response.body, bytes):
291
raise AssertionError('body must be bytes')
281
if not isinstance(response.body, str):
282
raise AssertionError('body must be a str')
292
283
if not (response.body_stream is None):
293
284
raise AssertionError(
294
285
'body_stream and body cannot both be set')
295
data = self._encode_bulk_data(response.body)
296
self._write_func(data)
286
bytes = self._encode_bulk_data(response.body)
287
self._write_func(bytes)
297
288
elif response.body_stream is not None:
298
289
_send_stream(response.body_stream, self._write_func)
301
292
def _send_stream(stream, write_func):
302
write_func(b'chunked\n')
293
write_func('chunked\n')
303
294
_send_chunks(stream, write_func)
307
298
def _send_chunks(stream, write_func):
308
299
for chunk in stream:
309
if isinstance(chunk, bytes):
310
data = ("%x\n" % len(chunk)).encode('ascii') + chunk
300
if isinstance(chunk, str):
301
bytes = "%x\n%s" % (len(chunk), chunk)
312
303
elif isinstance(chunk, request.FailedSmartServerResponse):
314
305
_send_chunks(chunk.args, write_func)
348
339
self.finished_reading = False
349
340
self._in_buffer_list = []
350
341
self._in_buffer_len = 0
351
self.unused_data = b''
342
self.unused_data = ''
352
343
self.bytes_left = None
353
344
self._number_needed_bytes = None
355
346
def _get_in_buffer(self):
356
347
if len(self._in_buffer_list) == 1:
357
348
return self._in_buffer_list[0]
358
in_buffer = b''.join(self._in_buffer_list)
349
in_buffer = ''.join(self._in_buffer_list)
359
350
if len(in_buffer) != self._in_buffer_len:
360
351
raise AssertionError(
361
352
"Length of buffer did not match expected value: %s != %s"
374
365
# check if we can yield the bytes from just the first entry in our list
375
366
if len(self._in_buffer_list) == 0:
376
367
raise AssertionError('Callers must be sure we have buffered bytes'
377
' before calling _get_in_bytes')
368
' before calling _get_in_bytes')
378
369
if len(self._in_buffer_list[0]) > count:
379
370
return self._in_buffer_list[0][:count]
380
371
# We can't yield it from the first buffer, so collapse all buffers, and
385
376
def _set_in_buffer(self, new_buf):
386
377
if new_buf is not None:
387
if not isinstance(new_buf, bytes):
388
raise TypeError(new_buf)
389
378
self._in_buffer_list = [new_buf]
390
379
self._in_buffer_len = len(new_buf)
392
381
self._in_buffer_list = []
393
382
self._in_buffer_len = 0
395
def accept_bytes(self, new_buf):
384
def accept_bytes(self, bytes):
396
385
"""Decode as much of bytes as possible.
398
If 'new_buf' contains too much data it will be appended to
387
If 'bytes' contains too much data it will be appended to
399
388
self.unused_data.
401
390
finished_reading will be set when no more data is required. Further
402
391
data will be appended to self.unused_data.
404
if not isinstance(new_buf, bytes):
405
raise TypeError(new_buf)
406
393
# accept_bytes is allowed to change the state
407
394
self._number_needed_bytes = None
408
395
# lsprof puts a very large amount of time on this specific call for
409
396
# large readv arrays
410
self._in_buffer_list.append(new_buf)
411
self._in_buffer_len += len(new_buf)
397
self._in_buffer_list.append(bytes)
398
self._in_buffer_len += len(bytes)
413
400
# Run the function for the current state.
414
401
current_state = self.state_accept
503
490
def _state_accept_expecting_length(self):
504
491
prefix = self._extract_line()
506
493
self.error = True
507
494
self.error_in_progress = []
508
495
self._state_accept_expecting_length()
510
elif prefix == b'END':
497
elif prefix == 'END':
511
498
# We've read the end-of-body marker.
512
499
# Any further bytes are unused data, including the bytes left in
513
500
# the _in_buffer.
546
533
_StatefulDecoder.__init__(self)
547
534
self.state_accept = self._state_accept_expecting_length
548
535
self.state_read = self._state_read_no_data
550
self._trailer_buffer = b''
537
self._trailer_buffer = ''
552
539
def next_read_size(self):
553
540
if self.bytes_left is not None:
572
559
def _state_accept_expecting_length(self):
573
560
in_buf = self._get_in_buffer()
574
pos = in_buf.find(b'\n')
561
pos = in_buf.find('\n')
577
564
self.bytes_left = int(in_buf[:pos])
578
self._set_in_buffer(in_buf[pos + 1:])
565
self._set_in_buffer(in_buf[pos+1:])
579
566
self.state_accept = self._state_accept_reading_body
580
567
self.state_read = self._state_read_body_buffer
597
584
self._set_in_buffer(None)
598
585
# TODO: what if the trailer does not match "done\n"? Should this raise
599
586
# a ProtocolViolation exception?
600
if self._trailer_buffer.startswith(b'done\n'):
601
self.unused_data = self._trailer_buffer[len(b'done\n'):]
587
if self._trailer_buffer.startswith('done\n'):
588
self.unused_data = self._trailer_buffer[len('done\n'):]
602
589
self.state_accept = self._state_accept_reading_unused
603
590
self.finished_reading = True
639
626
mutter('hpss call: %s', repr(args)[1:-1])
640
627
if getattr(self._request._medium, 'base', None) is not None:
641
628
mutter(' (to %s)', self._request._medium.base)
642
self._request_start_time = osutils.perf_counter()
629
self._request_start_time = osutils.timer_func()
643
630
self._write_args(args)
644
631
self._request.finished_writing()
645
632
self._last_verb = args[0]
652
639
if 'hpss' in debug.debug_flags:
653
640
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
654
641
if getattr(self._request._medium, '_path', None) is not None:
656
self._request._medium._path)
642
mutter(' (to %s)', self._request._medium._path)
657
643
mutter(' %d bytes', len(body))
658
self._request_start_time = osutils.perf_counter()
644
self._request_start_time = osutils.timer_func()
659
645
if 'hpssdetail' in debug.debug_flags:
660
646
mutter('hpss body content: %s', body)
661
647
self._write_args(args)
668
654
"""Make a remote call with a readv array.
670
656
The body is encoded with one line per readv offset pair. The numbers in
671
each pair are separated by a comma, and no trailing \\n is emitted.
657
each pair are separated by a comma, and no trailing \n is emitted.
673
659
if 'hpss' in debug.debug_flags:
674
660
mutter('hpss call w/readv: %s', repr(args)[1:-1])
675
661
if getattr(self._request._medium, '_path', None) is not None:
677
self._request._medium._path)
678
self._request_start_time = osutils.perf_counter()
662
mutter(' (to %s)', self._request._medium._path)
663
self._request_start_time = osutils.timer_func()
679
664
self._write_args(args)
680
665
readv_bytes = self._serialise_offsets(body)
681
666
bytes = self._encode_bulk_data(readv_bytes)
707
692
if 'hpss' in debug.debug_flags:
708
693
if self._request_start_time is not None:
709
694
mutter(' result: %6.3fs %s',
710
osutils.perf_counter() - self._request_start_time,
695
osutils.timer_func() - self._request_start_time,
711
696
repr(result)[1:-1])
712
697
self._request_start_time = None
764
749
:param verb: The verb used in that call.
765
750
:raises: UnexpectedSmartServerResponse
767
if (result_tuple == (b'error', b"Generic bzr smart protocol error: "
768
b"bad request '" + self._last_verb + b"'")
769
or result_tuple == (b'error', b"Generic bzr smart protocol error: "
770
b"bad request u'%s'" % self._last_verb)):
752
if (result_tuple == ('error', "Generic bzr smart protocol error: "
753
"bad request '%s'" % self._last_verb) or
754
result_tuple == ('error', "Generic bzr smart protocol error: "
755
"bad request u'%s'" % self._last_verb)):
771
756
# The response will have no body, so we've finished reading.
772
757
self._request.finished_reading()
773
758
raise errors.UnknownSmartMethod(self._last_verb)
785
770
while not _body_decoder.finished_reading:
786
771
bytes = self._request.read_bytes(_body_decoder.next_read_size())
788
773
# end of file encountered reading from server
789
774
raise errors.ConnectionReset(
790
775
"Connection lost while reading response body.")
791
776
_body_decoder.accept_bytes(bytes)
792
777
self._request.finished_reading()
793
self._body_buffer = BytesIO(_body_decoder.read_pending_data())
778
self._body_buffer = StringIO(_body_decoder.read_pending_data())
794
779
# XXX: TODO check the trailer result.
795
780
if 'hpss' in debug.debug_flags:
796
781
mutter(' %d body bytes read',
804
789
def query_version(self):
805
790
"""Return protocol version number of the server."""
807
792
resp = self.read_response_tuple()
808
if resp == (b'ok', b'1'):
793
if resp == ('ok', '1'):
810
elif resp == (b'ok', b'2'):
795
elif resp == ('ok', '2'):
813
798
raise errors.SmartProtocolError("bad response %r" % (resp,))
845
830
response_status = self._request.read_line()
846
831
result = SmartClientRequestProtocolOne._read_response_tuple(self)
847
832
self._response_is_unknown_method(result)
848
if response_status == b'success\n':
833
if response_status == 'success\n':
849
834
self.response_status = True
850
835
if not expect_body:
851
836
self._request.finished_reading()
853
elif response_status == b'failed\n':
838
elif response_status == 'failed\n':
854
839
self.response_status = False
855
840
self._request.finished_reading()
856
841
raise errors.ErrorFromSmartServer(result)
873
858
_body_decoder = ChunkedBodyDecoder()
874
859
while not _body_decoder.finished_reading:
875
860
bytes = self._request.read_bytes(_body_decoder.next_read_size())
877
862
# end of file encountered reading from server
878
863
raise errors.ConnectionReset(
879
864
"Connection lost while reading streamed body.")
880
865
_body_decoder.accept_bytes(bytes)
881
866
for body_bytes in iter(_body_decoder.read_next_chunk, None):
882
if 'hpss' in debug.debug_flags and isinstance(body_bytes, str):
867
if 'hpss' in debug.debug_flags and type(body_bytes) is str:
883
868
mutter(' %d byte chunk read',
892
877
backing_transport, commands=request.request_handlers,
893
878
root_client_path=root_client_path, jail_root=jail_root)
894
879
responder = ProtocolThreeResponder(write_func)
895
message_handler = message.ConventionalRequestHandler(
896
request_handler, responder)
880
message_handler = message.ConventionalRequestHandler(request_handler, responder)
897
881
return ProtocolThreeDecoder(message_handler)
923
907
_StatefulDecoder.accept_bytes(self, bytes)
924
908
except KeyboardInterrupt:
926
except errors.SmartMessageHandlerError as exception:
910
except errors.SmartMessageHandlerError, exception:
927
911
# We do *not* set self.decoding_failed here. The message handler
928
912
# has raised an error, but the decoder is still able to parse bytes
929
913
# and determine when this message ends.
933
917
# The state machine is ready to continue decoding, but the
934
918
# exception has interrupted the loop that runs the state machine.
935
919
# So we call accept_bytes again to restart it.
936
self.accept_bytes(b'')
937
except Exception as exception:
920
self.accept_bytes('')
921
except Exception, exception:
938
922
# The decoder itself has raised an exception. We cannot continue
940
924
self.decoding_failed = True
1009
993
def _state_accept_expecting_headers(self):
1010
994
decoded = self._extract_prefixed_bencoded_data()
1011
if not isinstance(decoded, dict):
995
if type(decoded) is not dict:
1012
996
raise errors.SmartProtocolError(
1013
997
'Header object %r is not a dict' % (decoded,))
1014
998
self.state_accept = self._state_accept_expecting_message_part
1020
1004
def _state_accept_expecting_message_part(self):
1021
1005
message_part_kind = self._extract_single_byte()
1022
if message_part_kind == b'o':
1006
if message_part_kind == 'o':
1023
1007
self.state_accept = self._state_accept_expecting_one_byte
1024
elif message_part_kind == b's':
1008
elif message_part_kind == 's':
1025
1009
self.state_accept = self._state_accept_expecting_structure
1026
elif message_part_kind == b'b':
1010
elif message_part_kind == 'b':
1027
1011
self.state_accept = self._state_accept_expecting_bytes
1028
elif message_part_kind == b'e':
1012
elif message_part_kind == 'e':
1031
1015
raise errors.SmartProtocolError(
1097
1081
self._real_write_func = write_func
1099
1083
def _write_func(self, bytes):
1084
# TODO: It is probably more appropriate to use sum(map(len, _buf))
1085
# for total number of bytes to write, rather than buffer based on
1086
# the number of write() calls
1100
1087
# TODO: Another possibility would be to turn this into an async model.
1101
1088
# Where we let another thread know that we have some bytes if
1102
1089
# they want it, but we don't actually block for it
1117
1104
"""Serialise a readv offset list."""
1119
1106
for start, length in offsets:
1120
txt.append(b'%d,%d' % (start, length))
1121
return b'\n'.join(txt)
1107
txt.append('%d,%d' % (start, length))
1108
return '\n'.join(txt)
1123
1110
def _write_protocol_version(self):
1124
1111
self._write_func(MESSAGE_VERSION_THREE)
1132
1119
self._write_prefixed_bencode(headers)
1134
1121
def _write_structure(self, args):
1135
self._write_func(b's')
1122
self._write_func('s')
1137
1124
for arg in args:
1138
if isinstance(arg, text_type):
1125
if type(arg) is unicode:
1139
1126
utf8_args.append(arg.encode('utf8'))
1141
1128
utf8_args.append(arg)
1142
1129
self._write_prefixed_bencode(utf8_args)
1144
1131
def _write_end(self):
1145
self._write_func(b'e')
1132
self._write_func('e')
1148
1135
def _write_prefixed_body(self, bytes):
1149
self._write_func(b'b')
1136
self._write_func('b')
1150
1137
self._write_func(struct.pack('!L', len(bytes)))
1151
1138
self._write_func(bytes)
1153
1140
def _write_chunked_body_start(self):
1154
self._write_func(b'oC')
1141
self._write_func('oC')
1156
1143
def _write_error_status(self):
1157
self._write_func(b'oE')
1144
self._write_func('oE')
1159
1146
def _write_success_status(self):
1160
self._write_func(b'oS')
1147
self._write_func('oS')
1163
1150
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1165
1152
def __init__(self, write_func):
1166
1153
_ProtocolThreeEncoder.__init__(self, write_func)
1167
1154
self.response_sent = False
1169
b'Software version': breezy.__version__.encode('utf-8')}
1155
self._headers = {'Software version': bzrlib.__version__}
1170
1156
if 'hpss' in debug.debug_flags:
1171
self._thread_id = _thread.get_ident()
1157
self._thread_id = thread.get_ident()
1172
1158
self._response_start_time = None
1174
1160
def _trace(self, action, message, extra_bytes=None, include_time=False):
1175
1161
if self._response_start_time is None:
1176
self._response_start_time = osutils.perf_counter()
1162
self._response_start_time = osutils.timer_func()
1177
1163
if include_time:
1178
t = '%5.3fs ' % (osutils.perf_counter() - self._response_start_time)
1164
t = '%5.3fs ' % (time.clock() - self._response_start_time)
1181
1167
if extra_bytes is None:
1194
1180
% (exception,))
1195
1181
if isinstance(exception, errors.UnknownSmartMethod):
1196
1182
failure = request.FailedSmartServerResponse(
1197
(b'UnknownMethod', exception.verb))
1183
('UnknownMethod', exception.verb))
1198
1184
self.send_response(failure)
1200
1186
if 'hpss' in debug.debug_flags:
1203
1189
self._write_protocol_version()
1204
1190
self._write_headers(self._headers)
1205
1191
self._write_error_status()
1206
self._write_structure(
1207
(b'error', str(exception).encode('utf-8', 'replace')))
1192
self._write_structure(('error', str(exception)))
1208
1193
self._write_end()
1210
1195
def send_response(self, response):
1318
1302
base = getattr(self._medium_request._medium, 'base', None)
1319
1303
if base is not None:
1320
1304
mutter(' (to %s)', base)
1321
self._request_start_time = osutils.perf_counter()
1305
self._request_start_time = osutils.timer_func()
1322
1306
self._write_protocol_version()
1323
1307
self._write_headers(self._headers)
1324
1308
self._write_structure(args)
1336
1320
if path is not None:
1337
1321
mutter(' (to %s)', path)
1338
1322
mutter(' %d bytes', len(body))
1339
self._request_start_time = osutils.perf_counter()
1323
self._request_start_time = osutils.timer_func()
1340
1324
self._write_protocol_version()
1341
1325
self._write_headers(self._headers)
1342
1326
self._write_structure(args)
1348
1332
"""Make a remote call with a readv array.
1350
1334
The body is encoded with one line per readv offset pair. The numbers in
1351
each pair are separated by a comma, and no trailing \\n is emitted.
1335
each pair are separated by a comma, and no trailing \n is emitted.
1353
1337
if 'hpss' in debug.debug_flags:
1354
1338
mutter('hpss call w/readv: %s', repr(args)[1:-1])
1355
1339
path = getattr(self._medium_request._medium, '_path', None)
1356
1340
if path is not None:
1357
1341
mutter(' (to %s)', path)
1358
self._request_start_time = osutils.perf_counter()
1342
self._request_start_time = osutils.timer_func()
1359
1343
self._write_protocol_version()
1360
1344
self._write_headers(self._headers)
1361
1345
self._write_structure(args)
1372
1356
path = getattr(self._medium_request._medium, '_path', None)
1373
1357
if path is not None:
1374
1358
mutter(' (to %s)', path)
1375
self._request_start_time = osutils.perf_counter()
1376
self.body_stream_started = False
1359
self._request_start_time = osutils.timer_func()
1377
1360
self._write_protocol_version()
1378
1361
self._write_headers(self._headers)
1379
1362
self._write_structure(args)
1381
1364
# have finished sending the stream. We would notice at the end
1382
1365
# anyway, but if the medium can deliver it early then it's good
1383
1366
# to short-circuit the whole request...
1384
# Provoke any ConnectionReset failures before we start the body stream.
1386
self.body_stream_started = True
1387
1367
for exc_info, part in _iter_with_errors(stream):
1388
1368
if exc_info is not None:
1389
1369
# Iterating the stream failed. Cleanly abort the request.
1390
1370
self._write_error_status()
1391
1371
# Currently the client unconditionally sends ('error',) as the
1393
self._write_structure((b'error',))
1373
self._write_structure(('error',))
1394
1374
self._write_end()
1395
1375
self._medium_request.finished_writing()
1376
raise exc_info[0], exc_info[1], exc_info[2]
1401
1378
self._write_prefixed_body(part)
1403
1380
self._write_end()
1404
1381
self._medium_request.finished_writing()