/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: John Arbash Meinel
  • Date: 2008-09-05 03:11:40 UTC
  • mfrom: (3691 +trunk)
  • mto: (3697.7.4 1.7)
  • mto: This revision was merged to the branch mainline in revision 3748.
  • Revision ID: john@arbash-meinel.com-20080905031140-hj0adlcf30l7i99v
Merge in bzr.dev 3691

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
""")
43
43
 
44
44
 
 
45
# We must not read any more than 64k at a time so we don't risk "no buffer
 
46
# space available" errors on some platforms.  Windows in particular is likely
 
47
# to give error 10053 or 10055 if we read more than 64k from a socket.
 
48
_MAX_READ_SIZE = 64 * 1024
 
49
 
 
50
 
45
51
def _get_protocol_factory_for_bytes(bytes):
46
52
    """Determine the right protocol factory for 'bytes'.
47
53
 
74
80
    return protocol_factory, bytes
75
81
 
76
82
 
77
 
class SmartServerStreamMedium(object):
 
83
def _get_line(read_bytes_func):
 
84
    """Read bytes using read_bytes_func until a newline byte.
 
85
    
 
86
    This isn't particularly efficient, so should only be used when the
 
87
    expected size of the line is quite short.
 
88
    
 
89
    :returns: a tuple of two strs: (line, excess)
 
90
    """
 
91
    newline_pos = -1
 
92
    bytes = ''
 
93
    while newline_pos == -1:
 
94
        new_bytes = read_bytes_func(1)
 
95
        bytes += new_bytes
 
96
        if new_bytes == '':
 
97
            # Ran out of bytes before receiving a complete line.
 
98
            return bytes, ''
 
99
        newline_pos = bytes.find('\n')
 
100
    line = bytes[:newline_pos+1]
 
101
    excess = bytes[newline_pos+1:]
 
102
    return line, excess
 
103
 
 
104
 
 
105
class SmartMedium(object):
 
106
    """Base class for smart protocol media, both client- and server-side."""
 
107
 
 
108
    def __init__(self):
 
109
        self._push_back_buffer = None
 
110
        
 
111
    def _push_back(self, bytes):
 
112
        """Return unused bytes to the medium, because they belong to the next
 
113
        request(s).
 
114
 
 
115
        This sets the _push_back_buffer to the given bytes.
 
116
        """
 
117
        if self._push_back_buffer is not None:
 
118
            raise AssertionError(
 
119
                "_push_back called when self._push_back_buffer is %r"
 
120
                % (self._push_back_buffer,))
 
121
        if bytes == '':
 
122
            return
 
123
        self._push_back_buffer = bytes
 
124
 
 
125
    def _get_push_back_buffer(self):
 
126
        if self._push_back_buffer == '':
 
127
            raise AssertionError(
 
128
                '%s._push_back_buffer should never be the empty string, '
 
129
                'which can be confused with EOF' % (self,))
 
130
        bytes = self._push_back_buffer
 
131
        self._push_back_buffer = None
 
132
        return bytes
 
133
 
 
134
    def read_bytes(self, desired_count):
 
135
        """Read some bytes from this medium.
 
136
 
 
137
        :returns: some bytes, possibly more or less than the number requested
 
138
            in 'desired_count' depending on the medium.
 
139
        """
 
140
        if self._push_back_buffer is not None:
 
141
            return self._get_push_back_buffer()
 
142
        bytes_to_read = min(desired_count, _MAX_READ_SIZE)
 
143
        return self._read_bytes(bytes_to_read)
 
144
 
 
145
    def _read_bytes(self, count):
 
146
        raise NotImplementedError(self._read_bytes)
 
147
 
 
148
    def _get_line(self):
 
149
        """Read bytes from this request's response until a newline byte.
 
150
        
 
151
        This isn't particularly efficient, so should only be used when the
 
152
        expected size of the line is quite short.
 
153
 
 
154
        :returns: a string of bytes ending in a newline (byte 0x0A).
 
155
        """
 
156
        line, excess = _get_line(self.read_bytes)
 
157
        self._push_back(excess)
 
158
        return line
 
159
 
 
160
 
 
161
class SmartServerStreamMedium(SmartMedium):
78
162
    """Handles smart commands coming over a stream.
79
163
 
80
164
    The stream may be a pipe connected to sshd, or a tcp socket, or an
101
185
        self.backing_transport = backing_transport
102
186
        self.root_client_path = root_client_path
103
187
        self.finished = False
104
 
        self._push_back_buffer = None
105
 
 
106
 
    def _push_back(self, bytes):
107
 
        """Return unused bytes to the medium, because they belong to the next
108
 
        request(s).
109
 
 
110
 
        This sets the _push_back_buffer to the given bytes.
111
 
        """
112
 
        if self._push_back_buffer is not None:
113
 
            raise AssertionError(
114
 
                "_push_back called when self._push_back_buffer is %r"
115
 
                % (self._push_back_buffer,))
116
 
        if bytes == '':
117
 
            return
118
 
        self._push_back_buffer = bytes
119
 
 
120
 
    def _get_push_back_buffer(self):
121
 
        if self._push_back_buffer == '':
122
 
            raise AssertionError(
123
 
                '%s._push_back_buffer should never be the empty string, '
124
 
                'which can be confused with EOF' % (self,))
125
 
        bytes = self._push_back_buffer
126
 
        self._push_back_buffer = None
127
 
        return bytes
 
188
        SmartMedium.__init__(self)
128
189
 
129
190
    def serve(self):
130
191
        """Serve requests until the client disconnects."""
171
232
        """Called when an unhandled exception from the protocol occurs."""
172
233
        raise NotImplementedError(self.terminate_due_to_error)
173
234
 
174
 
    def _get_bytes(self, desired_count):
 
235
    def _read_bytes(self, desired_count):
175
236
        """Get some bytes from the medium.
176
237
 
177
238
        :param desired_count: number of bytes we want to read.
178
239
        """
179
 
        raise NotImplementedError(self._get_bytes)
180
 
 
181
 
    def _get_line(self):
182
 
        """Read bytes from this request's response until a newline byte.
183
 
        
184
 
        This isn't particularly efficient, so should only be used when the
185
 
        expected size of the line is quite short.
186
 
 
187
 
        :returns: a string of bytes ending in a newline (byte 0x0A).
188
 
        """
189
 
        newline_pos = -1
190
 
        bytes = ''
191
 
        while newline_pos == -1:
192
 
            new_bytes = self._get_bytes(1)
193
 
            bytes += new_bytes
194
 
            if new_bytes == '':
195
 
                # Ran out of bytes before receiving a complete line.
196
 
                return bytes
197
 
            newline_pos = bytes.find('\n')
198
 
        line = bytes[:newline_pos+1]
199
 
        self._push_back(bytes[newline_pos+1:])
200
 
        return line
201
 
 
 
240
        raise NotImplementedError(self._read_bytes)
 
241
 
202
242
 
203
243
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
204
244
 
215
255
 
216
256
    def _serve_one_request_unguarded(self, protocol):
217
257
        while protocol.next_read_size():
218
 
            bytes = self._get_bytes(4096)
 
258
            # We can safely try to read large chunks.  If there is less data
 
259
            # than _MAX_READ_SIZE ready, the socket wil just return a short
 
260
            # read immediately rather than block.
 
261
            bytes = self.read_bytes(_MAX_READ_SIZE)
219
262
            if bytes == '':
220
263
                self.finished = True
221
264
                return
223
266
        
224
267
        self._push_back(protocol.unused_data)
225
268
 
226
 
    def _get_bytes(self, desired_count):
227
 
        if self._push_back_buffer is not None:
228
 
            return self._get_push_back_buffer()
 
269
    def _read_bytes(self, desired_count):
229
270
        # We ignore the desired_count because on sockets it's more efficient to
230
 
        # read 4k at a time.
231
 
        return self.socket.recv(4096)
232
 
    
 
271
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
272
        return self.socket.recv(_MAX_READ_SIZE)
 
273
 
233
274
    def terminate_due_to_error(self):
234
275
        # TODO: This should log to a server log file, but no such thing
235
276
        # exists yet.  Andrew Bennetts 2006-09-29.
262
303
 
263
304
    def _serve_one_request_unguarded(self, protocol):
264
305
        while True:
 
306
            # We need to be careful not to read past the end of the current
 
307
            # request, or else the read from the pipe will block, so we use
 
308
            # protocol.next_read_size().
265
309
            bytes_to_read = protocol.next_read_size()
266
310
            if bytes_to_read == 0:
267
311
                # Finished serving this request.
268
312
                self._out.flush()
269
313
                return
270
 
            bytes = self._get_bytes(bytes_to_read)
 
314
            bytes = self.read_bytes(bytes_to_read)
271
315
            if bytes == '':
272
316
                # Connection has been closed.
273
317
                self.finished = True
275
319
                return
276
320
            protocol.accept_bytes(bytes)
277
321
 
278
 
    def _get_bytes(self, desired_count):
279
 
        if self._push_back_buffer is not None:
280
 
            return self._get_push_back_buffer()
 
322
    def _read_bytes(self, desired_count):
281
323
        return self._in.read(desired_count)
282
324
 
283
325
    def terminate_due_to_error(self):
397
439
        return self._read_bytes(count)
398
440
 
399
441
    def _read_bytes(self, count):
400
 
        """Helper for read_bytes.
 
442
        """Helper for SmartClientMediumRequest.read_bytes.
401
443
 
402
444
        read_bytes checks the state of the request to determing if bytes
403
445
        should be read. After that it hands off to _read_bytes to do the
404
446
        actual read.
 
447
        
 
448
        By default this forwards to self._medium.read_bytes because we are
 
449
        operating on the medium's stream.
405
450
        """
406
 
        raise NotImplementedError(self._read_bytes)
 
451
        return self._medium.read_bytes(count)
407
452
 
408
453
    def read_line(self):
409
 
        """Read bytes from this request's response until a newline byte.
 
454
        line = self._read_line()
 
455
        if not line.endswith('\n'):
 
456
            # end of file encountered reading from server
 
457
            raise errors.ConnectionReset(
 
458
                "please check connectivity and permissions",
 
459
                "(and try -Dhpss if further diagnosis is required)")
 
460
        return line
 
461
 
 
462
    def _read_line(self):
 
463
        """Helper for SmartClientMediumRequest.read_line.
410
464
        
411
 
        This isn't particularly efficient, so should only be used when the
412
 
        expected size of the line is quite short.
413
 
 
414
 
        :returns: a string of bytes ending in a newline (byte 0x0A).
 
465
        By default this forwards to self._medium._get_line because we are
 
466
        operating on the medium's stream.
415
467
        """
416
 
        # XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
417
 
        line = ''
418
 
        while not line or line[-1] != '\n':
419
 
            new_char = self.read_bytes(1)
420
 
            line += new_char
421
 
            if new_char == '':
422
 
                # end of file encountered reading from server
423
 
                raise errors.ConnectionReset(
424
 
                    "please check connectivity and permissions",
425
 
                    "(and try -Dhpss if further diagnosis is required)")
426
 
        return line
427
 
 
428
 
 
429
 
class SmartClientMedium(object):
 
468
        return self._medium._get_line()
 
469
 
 
470
 
 
471
class SmartClientMedium(SmartMedium):
430
472
    """Smart client is a medium for sending smart protocol requests over."""
431
473
 
432
474
    def __init__(self, base):
567
609
        """
568
610
        return SmartClientStreamMediumRequest(self)
569
611
 
570
 
    def read_bytes(self, count):
571
 
        return self._read_bytes(count)
572
 
 
573
612
 
574
613
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
575
614
    """A client medium using simple pipes.
660
699
        """See SmartClientStreamMedium.read_bytes."""
661
700
        if not self._connected:
662
701
            raise errors.MediumNotConnected(self)
663
 
        return self._read_from.read(count)
 
702
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
703
        return self._read_from.read(bytes_to_read)
664
704
 
665
705
 
666
706
# Port 4155 is the default port for bzr://, registered with IANA.
726
766
        """See SmartClientMedium.read_bytes."""
727
767
        if not self._connected:
728
768
            raise errors.MediumNotConnected(self)
729
 
        return self._socket.recv(count)
 
769
        # We ignore the desired_count because on sockets it's more efficient to
 
770
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
771
        return self._socket.recv(_MAX_READ_SIZE)
730
772
 
731
773
 
732
774
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
768
810
        """
769
811
        self._medium._flush()
770
812
 
771
 
    def _read_bytes(self, count):
772
 
        """See SmartClientMediumRequest._read_bytes.
773
 
        
774
 
        This forwards to self._medium._read_bytes because we are operating
775
 
        on the mediums stream.
776
 
        """
777
 
        return self._medium._read_bytes(count)
778