/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-07-22 01:24:53 UTC
  • mfrom: (3565.1.4 hpss-v3-memory-error)
  • Revision ID: pqm@pqm.ubuntu.com-20080722012453-i58b5mk2wayinusg
Make sure we never read more than 64k at a time from a smart medium.
        (Andrew Bennetts, #246180)

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
""")
43
43
 
44
44
 
 
45
# We must not read any more than 64k at a time so we don't risk "no buffer
 
46
# space available" errors on some platforms.  Windows in particular is likely
 
47
# to give error 10053 or 10055 if we read more than 64k from a socket.
 
48
_MAX_READ_SIZE = 64 * 1024
 
49
 
 
50
 
45
51
def _get_protocol_factory_for_bytes(bytes):
46
52
    """Determine the right protocol factory for 'bytes'.
47
53
 
74
80
    return protocol_factory, bytes
75
81
 
76
82
 
77
 
class SmartServerStreamMedium(object):
 
83
class SmartMedium(object):
 
84
    """Base class for smart protocol media, both client- and server-side."""
 
85
 
 
86
    def __init__(self):
 
87
        self._push_back_buffer = None
 
88
        
 
89
    def _push_back(self, bytes):
 
90
        """Return unused bytes to the medium, because they belong to the next
 
91
        request(s).
 
92
 
 
93
        This sets the _push_back_buffer to the given bytes.
 
94
        """
 
95
        if self._push_back_buffer is not None:
 
96
            raise AssertionError(
 
97
                "_push_back called when self._push_back_buffer is %r"
 
98
                % (self._push_back_buffer,))
 
99
        if bytes == '':
 
100
            return
 
101
        self._push_back_buffer = bytes
 
102
 
 
103
    def _get_push_back_buffer(self):
 
104
        if self._push_back_buffer == '':
 
105
            raise AssertionError(
 
106
                '%s._push_back_buffer should never be the empty string, '
 
107
                'which can be confused with EOF' % (self,))
 
108
        bytes = self._push_back_buffer
 
109
        self._push_back_buffer = None
 
110
        return bytes
 
111
 
 
112
    def read_bytes(self, desired_count):
 
113
        """Read some bytes from this medium.
 
114
 
 
115
        :returns: some bytes, possibly more or less than the number requested
 
116
            in 'desired_count' depending on the medium.
 
117
        """
 
118
        if self._push_back_buffer is not None:
 
119
            return self._get_push_back_buffer()
 
120
        bytes_to_read = min(desired_count, _MAX_READ_SIZE)
 
121
        return self._read_bytes(bytes_to_read)
 
122
 
 
123
    def _read_bytes(self, count):
 
124
        raise NotImplementedError(self._read_bytes)
 
125
 
 
126
    def _get_line(self):
 
127
        """Read bytes from this request's response until a newline byte.
 
128
        
 
129
        This isn't particularly efficient, so should only be used when the
 
130
        expected size of the line is quite short.
 
131
 
 
132
        :returns: a string of bytes ending in a newline (byte 0x0A).
 
133
        """
 
134
        newline_pos = -1
 
135
        bytes = ''
 
136
        while newline_pos == -1:
 
137
            new_bytes = self.read_bytes(1)
 
138
            bytes += new_bytes
 
139
            if new_bytes == '':
 
140
                # Ran out of bytes before receiving a complete line.
 
141
                return bytes
 
142
            newline_pos = bytes.find('\n')
 
143
        line = bytes[:newline_pos+1]
 
144
        self._push_back(bytes[newline_pos+1:])
 
145
        return line
 
146
 
 
147
 
 
148
class SmartServerStreamMedium(SmartMedium):
78
149
    """Handles smart commands coming over a stream.
79
150
 
80
151
    The stream may be a pipe connected to sshd, or a tcp socket, or an
101
172
        self.backing_transport = backing_transport
102
173
        self.root_client_path = root_client_path
103
174
        self.finished = False
104
 
        self._push_back_buffer = None
105
 
 
106
 
    def _push_back(self, bytes):
107
 
        """Return unused bytes to the medium, because they belong to the next
108
 
        request(s).
109
 
 
110
 
        This sets the _push_back_buffer to the given bytes.
111
 
        """
112
 
        if self._push_back_buffer is not None:
113
 
            raise AssertionError(
114
 
                "_push_back called when self._push_back_buffer is %r"
115
 
                % (self._push_back_buffer,))
116
 
        if bytes == '':
117
 
            return
118
 
        self._push_back_buffer = bytes
119
 
 
120
 
    def _get_push_back_buffer(self):
121
 
        if self._push_back_buffer == '':
122
 
            raise AssertionError(
123
 
                '%s._push_back_buffer should never be the empty string, '
124
 
                'which can be confused with EOF' % (self,))
125
 
        bytes = self._push_back_buffer
126
 
        self._push_back_buffer = None
127
 
        return bytes
 
175
        SmartMedium.__init__(self)
128
176
 
129
177
    def serve(self):
130
178
        """Serve requests until the client disconnects."""
171
219
        """Called when an unhandled exception from the protocol occurs."""
172
220
        raise NotImplementedError(self.terminate_due_to_error)
173
221
 
174
 
    def _get_bytes(self, desired_count):
 
222
    def _read_bytes(self, desired_count):
175
223
        """Get some bytes from the medium.
176
224
 
177
225
        :param desired_count: number of bytes we want to read.
178
226
        """
179
 
        raise NotImplementedError(self._get_bytes)
180
 
 
181
 
    def _get_line(self):
182
 
        """Read bytes from this request's response until a newline byte.
183
 
        
184
 
        This isn't particularly efficient, so should only be used when the
185
 
        expected size of the line is quite short.
186
 
 
187
 
        :returns: a string of bytes ending in a newline (byte 0x0A).
188
 
        """
189
 
        newline_pos = -1
190
 
        bytes = ''
191
 
        while newline_pos == -1:
192
 
            new_bytes = self._get_bytes(1)
193
 
            bytes += new_bytes
194
 
            if new_bytes == '':
195
 
                # Ran out of bytes before receiving a complete line.
196
 
                return bytes
197
 
            newline_pos = bytes.find('\n')
198
 
        line = bytes[:newline_pos+1]
199
 
        self._push_back(bytes[newline_pos+1:])
200
 
        return line
201
 
 
 
227
        raise NotImplementedError(self._read_bytes)
 
228
 
202
229
 
203
230
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
204
231
 
215
242
 
216
243
    def _serve_one_request_unguarded(self, protocol):
217
244
        while protocol.next_read_size():
218
 
            bytes = self._get_bytes(4096)
 
245
            # We can safely try to read large chunks.  If there is less data
 
246
            # than _MAX_READ_SIZE ready, the socket wil just return a short
 
247
            # read immediately rather than block.
 
248
            bytes = self.read_bytes(_MAX_READ_SIZE)
219
249
            if bytes == '':
220
250
                self.finished = True
221
251
                return
223
253
        
224
254
        self._push_back(protocol.unused_data)
225
255
 
226
 
    def _get_bytes(self, desired_count):
227
 
        if self._push_back_buffer is not None:
228
 
            return self._get_push_back_buffer()
 
256
    def _read_bytes(self, desired_count):
229
257
        # We ignore the desired_count because on sockets it's more efficient to
230
 
        # read 4k at a time.
231
 
        return self.socket.recv(4096)
232
 
    
 
258
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
259
        return self.socket.recv(_MAX_READ_SIZE)
 
260
 
233
261
    def terminate_due_to_error(self):
234
262
        # TODO: This should log to a server log file, but no such thing
235
263
        # exists yet.  Andrew Bennetts 2006-09-29.
262
290
 
263
291
    def _serve_one_request_unguarded(self, protocol):
264
292
        while True:
 
293
            # We need to be careful not to read past the end of the current
 
294
            # request, or else the read from the pipe will block, so we use
 
295
            # protocol.next_read_size().
265
296
            bytes_to_read = protocol.next_read_size()
266
297
            if bytes_to_read == 0:
267
298
                # Finished serving this request.
268
299
                self._out.flush()
269
300
                return
270
 
            bytes = self._get_bytes(bytes_to_read)
 
301
            bytes = self.read_bytes(bytes_to_read)
271
302
            if bytes == '':
272
303
                # Connection has been closed.
273
304
                self.finished = True
275
306
                return
276
307
            protocol.accept_bytes(bytes)
277
308
 
278
 
    def _get_bytes(self, desired_count):
279
 
        if self._push_back_buffer is not None:
280
 
            return self._get_push_back_buffer()
 
309
    def _read_bytes(self, desired_count):
281
310
        return self._in.read(desired_count)
282
311
 
283
312
    def terminate_due_to_error(self):
397
426
        return self._read_bytes(count)
398
427
 
399
428
    def _read_bytes(self, count):
400
 
        """Helper for read_bytes.
 
429
        """Helper for SmartClientMediumRequest.read_bytes.
401
430
 
402
431
        read_bytes checks the state of the request to determing if bytes
403
432
        should be read. After that it hands off to _read_bytes to do the
404
433
        actual read.
 
434
        
 
435
        By default this forwards to self._medium.read_bytes because we are
 
436
        operating on the medium's stream.
405
437
        """
406
 
        raise NotImplementedError(self._read_bytes)
 
438
        return self._medium.read_bytes(count)
407
439
 
408
440
    def read_line(self):
409
 
        """Read bytes from this request's response until a newline byte.
410
 
        
411
 
        This isn't particularly efficient, so should only be used when the
412
 
        expected size of the line is quite short.
413
 
 
414
 
        :returns: a string of bytes ending in a newline (byte 0x0A).
415
 
        """
416
 
        # XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
417
 
        line = ''
418
 
        while not line or line[-1] != '\n':
419
 
            new_char = self.read_bytes(1)
420
 
            line += new_char
421
 
            if new_char == '':
422
 
                # end of file encountered reading from server
423
 
                raise errors.ConnectionReset(
424
 
                    "please check connectivity and permissions",
425
 
                    "(and try -Dhpss if further diagnosis is required)")
 
441
        line = self._medium._get_line()
 
442
        if not line.endswith('\n'):
 
443
            # end of file encountered reading from server
 
444
            raise errors.ConnectionReset(
 
445
                "please check connectivity and permissions",
 
446
                "(and try -Dhpss if further diagnosis is required)")
426
447
        return line
427
448
 
428
449
 
429
 
class SmartClientMedium(object):
 
450
class SmartClientMedium(SmartMedium):
430
451
    """Smart client is a medium for sending smart protocol requests over."""
431
452
 
432
453
    def __init__(self, base):
567
588
        """
568
589
        return SmartClientStreamMediumRequest(self)
569
590
 
570
 
    def read_bytes(self, count):
571
 
        return self._read_bytes(count)
572
 
 
573
591
 
574
592
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
575
593
    """A client medium using simple pipes.
660
678
        """See SmartClientStreamMedium.read_bytes."""
661
679
        if not self._connected:
662
680
            raise errors.MediumNotConnected(self)
663
 
        return self._read_from.read(count)
 
681
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
682
        return self._read_from.read(bytes_to_read)
664
683
 
665
684
 
666
685
# Port 4155 is the default port for bzr://, registered with IANA.
726
745
        """See SmartClientMedium.read_bytes."""
727
746
        if not self._connected:
728
747
            raise errors.MediumNotConnected(self)
729
 
        return self._socket.recv(count)
 
748
        # We ignore the desired_count because on sockets it's more efficient to
 
749
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
750
        return self._socket.recv(_MAX_READ_SIZE)
730
751
 
731
752
 
732
753
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
768
789
        """
769
790
        self._medium._flush()
770
791
 
771
 
    def _read_bytes(self, count):
772
 
        """See SmartClientMediumRequest._read_bytes.
773
 
        
774
 
        This forwards to self._medium._read_bytes because we are operating
775
 
        on the mediums stream.
776
 
        """
777
 
        return self._medium._read_bytes(count)
778