/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
2400.1.3 by Andrew Bennetts
Split smart transport code into several separate modules.
1
# Copyright (C) 2006,2007 Canonical Ltd
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
"""The 'medium' layer for the smart servers and clients.
18
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
21
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic.  See the overview in
24
bzrlib/transport/smart/__init__.py.
25
"""
26
27
import os
28
import socket
29
import sys
30
from bzrlib import errors
31
from bzrlib.smart.protocol import SmartServerRequestProtocolOne
32
33
try:
34
    from bzrlib.transport import ssh
35
except errors.ParamikoNotPresent:
36
    # no paramiko.  SmartSSHClientMedium will break.
37
    pass
38
39
40
class SmartClientMediumRequest(object):
41
    """A request on a SmartClientMedium.
42
43
    Each request allows bytes to be provided to it via accept_bytes, and then
44
    the response bytes to be read via read_bytes.
45
46
    For instance:
47
    request.accept_bytes('123')
48
    request.finished_writing()
49
    result = request.read_bytes(3)
50
    request.finished_reading()
51
52
    It is up to the individual SmartClientMedium whether multiple concurrent
53
    requests can exist. See SmartClientMedium.get_request to obtain instances 
54
    of SmartClientMediumRequest, and the concrete Medium you are using for 
55
    details on concurrency and pipelining.
56
    """
57
58
    def __init__(self, medium):
59
        """Construct a SmartClientMediumRequest for the medium medium."""
60
        self._medium = medium
61
        # we track state by constants - we may want to use the same
62
        # pattern as BodyReader if it gets more complex.
63
        # valid states are: "writing", "reading", "done"
64
        self._state = "writing"
65
66
    def accept_bytes(self, bytes):
67
        """Accept bytes for inclusion in this request.
68
69
        This method may not be be called after finished_writing() has been
70
        called.  It depends upon the Medium whether or not the bytes will be
71
        immediately transmitted. Message based Mediums will tend to buffer the
72
        bytes until finished_writing() is called.
73
74
        :param bytes: A bytestring.
75
        """
76
        if self._state != "writing":
77
            raise errors.WritingCompleted(self)
78
        self._accept_bytes(bytes)
79
80
    def _accept_bytes(self, bytes):
81
        """Helper for accept_bytes.
82
83
        Accept_bytes checks the state of the request to determing if bytes
84
        should be accepted. After that it hands off to _accept_bytes to do the
85
        actual acceptance.
86
        """
87
        raise NotImplementedError(self._accept_bytes)
88
89
    def finished_reading(self):
90
        """Inform the request that all desired data has been read.
91
92
        This will remove the request from the pipeline for its medium (if the
93
        medium supports pipelining) and any further calls to methods on the
94
        request will raise ReadingCompleted.
95
        """
96
        if self._state == "writing":
97
            raise errors.WritingNotComplete(self)
98
        if self._state != "reading":
99
            raise errors.ReadingCompleted(self)
100
        self._state = "done"
101
        self._finished_reading()
102
103
    def _finished_reading(self):
104
        """Helper for finished_reading.
105
106
        finished_reading checks the state of the request to determine if 
107
        finished_reading is allowed, and if it is hands off to _finished_reading
108
        to perform the action.
109
        """
110
        raise NotImplementedError(self._finished_reading)
111
112
    def finished_writing(self):
113
        """Finish the writing phase of this request.
114
115
        This will flush all pending data for this request along the medium.
116
        After calling finished_writing, you may not call accept_bytes anymore.
117
        """
118
        if self._state != "writing":
119
            raise errors.WritingCompleted(self)
120
        self._state = "reading"
121
        self._finished_writing()
122
123
    def _finished_writing(self):
124
        """Helper for finished_writing.
125
126
        finished_writing checks the state of the request to determine if 
127
        finished_writing is allowed, and if it is hands off to _finished_writing
128
        to perform the action.
129
        """
130
        raise NotImplementedError(self._finished_writing)
131
132
    def read_bytes(self, count):
133
        """Read bytes from this requests response.
134
135
        This method will block and wait for count bytes to be read. It may not
136
        be invoked until finished_writing() has been called - this is to ensure
137
        a message-based approach to requests, for compatability with message
138
        based mediums like HTTP.
139
        """
140
        if self._state == "writing":
141
            raise errors.WritingNotComplete(self)
142
        if self._state != "reading":
143
            raise errors.ReadingCompleted(self)
144
        return self._read_bytes(count)
145
146
    def _read_bytes(self, count):
147
        """Helper for read_bytes.
148
149
        read_bytes checks the state of the request to determing if bytes
150
        should be read. After that it hands off to _read_bytes to do the
151
        actual read.
152
        """
153
        raise NotImplementedError(self._read_bytes)
154
155
156
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
157
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
158
159
    def __init__(self, medium):
160
        SmartClientMediumRequest.__init__(self, medium)
161
        # check that we are safe concurrency wise. If some streams start
162
        # allowing concurrent requests - i.e. via multiplexing - then this
163
        # assert should be moved to SmartClientStreamMedium.get_request,
164
        # and the setting/unsetting of _current_request likewise moved into
165
        # that class : but its unneeded overhead for now. RBC 20060922
166
        if self._medium._current_request is not None:
167
            raise errors.TooManyConcurrentRequests(self._medium)
168
        self._medium._current_request = self
169
170
    def _accept_bytes(self, bytes):
171
        """See SmartClientMediumRequest._accept_bytes.
172
        
173
        This forwards to self._medium._accept_bytes because we are operating
174
        on the mediums stream.
175
        """
176
        self._medium._accept_bytes(bytes)
177
178
    def _finished_reading(self):
179
        """See SmartClientMediumRequest._finished_reading.
180
181
        This clears the _current_request on self._medium to allow a new 
182
        request to be created.
183
        """
184
        assert self._medium._current_request is self
185
        self._medium._current_request = None
186
        
187
    def _finished_writing(self):
188
        """See SmartClientMediumRequest._finished_writing.
189
190
        This invokes self._medium._flush to ensure all bytes are transmitted.
191
        """
192
        self._medium._flush()
193
194
    def _read_bytes(self, count):
195
        """See SmartClientMediumRequest._read_bytes.
196
        
197
        This forwards to self._medium._read_bytes because we are operating
198
        on the mediums stream.
199
        """
200
        return self._medium._read_bytes(count)
201
202
203
class SmartServerStreamMedium(object):
204
    """Handles smart commands coming over a stream.
205
206
    The stream may be a pipe connected to sshd, or a tcp socket, or an
207
    in-process fifo for testing.
208
209
    One instance is created for each connected client; it can serve multiple
210
    requests in the lifetime of the connection.
211
212
    The server passes requests through to an underlying backing transport, 
213
    which will typically be a LocalTransport looking at the server's filesystem.
214
    """
215
216
    def __init__(self, backing_transport):
217
        """Construct new server.
218
219
        :param backing_transport: Transport for the directory served.
220
        """
221
        # backing_transport could be passed to serve instead of __init__
222
        self.backing_transport = backing_transport
223
        self.finished = False
224
225
    def serve(self):
226
        """Serve requests until the client disconnects."""
227
        # Keep a reference to stderr because the sys module's globals get set to
228
        # None during interpreter shutdown.
229
        from sys import stderr
230
        try:
231
            while not self.finished:
232
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
233
                                                         self._write_out)
234
                self._serve_one_request(protocol)
235
        except Exception, e:
236
            stderr.write("%s terminating on exception %s\n" % (self, e))
237
            raise
238
239
    def _serve_one_request(self, protocol):
240
        """Read one request from input, process, send back a response.
241
        
242
        :param protocol: a SmartServerRequestProtocol.
243
        """
244
        try:
245
            self._serve_one_request_unguarded(protocol)
246
        except KeyboardInterrupt:
247
            raise
248
        except Exception, e:
249
            self.terminate_due_to_error()
250
251
    def terminate_due_to_error(self):
252
        """Called when an unhandled exception from the protocol occurs."""
253
        raise NotImplementedError(self.terminate_due_to_error)
254
255
256
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
257
258
    def __init__(self, sock, backing_transport):
259
        """Constructor.
260
261
        :param sock: the socket the server will read from.  It will be put
262
            into blocking mode.
263
        """
264
        SmartServerStreamMedium.__init__(self, backing_transport)
265
        self.push_back = ''
266
        sock.setblocking(True)
267
        self.socket = sock
268
269
    def _serve_one_request_unguarded(self, protocol):
270
        while protocol.next_read_size():
271
            if self.push_back:
272
                protocol.accept_bytes(self.push_back)
273
                self.push_back = ''
274
            else:
275
                bytes = self.socket.recv(4096)
276
                if bytes == '':
277
                    self.finished = True
278
                    return
279
                protocol.accept_bytes(bytes)
280
        
281
        self.push_back = protocol.excess_buffer
282
    
283
    def terminate_due_to_error(self):
284
        """Called when an unhandled exception from the protocol occurs."""
285
        # TODO: This should log to a server log file, but no such thing
286
        # exists yet.  Andrew Bennetts 2006-09-29.
287
        self.socket.close()
288
        self.finished = True
289
290
    def _write_out(self, bytes):
291
        self.socket.sendall(bytes)
292
293
294
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
295
296
    def __init__(self, in_file, out_file, backing_transport):
297
        """Construct new server.
298
299
        :param in_file: Python file from which requests can be read.
300
        :param out_file: Python file to write responses.
301
        :param backing_transport: Transport for the directory served.
302
        """
303
        SmartServerStreamMedium.__init__(self, backing_transport)
304
        if sys.platform == 'win32':
305
            # force binary mode for files
306
            import msvcrt
307
            for f in (in_file, out_file):
308
                fileno = getattr(f, 'fileno', None)
309
                if fileno:
310
                    msvcrt.setmode(fileno(), os.O_BINARY)
311
        self._in = in_file
312
        self._out = out_file
313
314
    def _serve_one_request_unguarded(self, protocol):
315
        while True:
316
            bytes_to_read = protocol.next_read_size()
317
            if bytes_to_read == 0:
318
                # Finished serving this request.
319
                self._out.flush()
320
                return
321
            bytes = self._in.read(bytes_to_read)
322
            if bytes == '':
323
                # Connection has been closed.
324
                self.finished = True
325
                self._out.flush()
326
                return
327
            protocol.accept_bytes(bytes)
328
329
    def terminate_due_to_error(self):
330
        # TODO: This should log to a server log file, but no such thing
331
        # exists yet.  Andrew Bennetts 2006-09-29.
332
        self._out.close()
333
        self.finished = True
334
335
    def _write_out(self, bytes):
336
        self._out.write(bytes)
337
338
339
class SmartClientMedium(object):
340
    """Smart client is a medium for sending smart protocol requests over."""
341
342
    def disconnect(self):
343
        """If this medium maintains a persistent connection, close it.
344
        
345
        The default implementation does nothing.
346
        """
347
        
348
349
class SmartClientStreamMedium(SmartClientMedium):
350
    """Stream based medium common class.
351
352
    SmartClientStreamMediums operate on a stream. All subclasses use a common
353
    SmartClientStreamMediumRequest for their requests, and should implement
354
    _accept_bytes and _read_bytes to allow the request objects to send and
355
    receive bytes.
356
    """
357
358
    def __init__(self):
359
        self._current_request = None
360
361
    def accept_bytes(self, bytes):
362
        self._accept_bytes(bytes)
363
364
    def __del__(self):
365
        """The SmartClientStreamMedium knows how to close the stream when it is
366
        finished with it.
367
        """
368
        self.disconnect()
369
370
    def _flush(self):
371
        """Flush the output stream.
372
        
373
        This method is used by the SmartClientStreamMediumRequest to ensure that
374
        all data for a request is sent, to avoid long timeouts or deadlocks.
375
        """
376
        raise NotImplementedError(self._flush)
377
378
    def get_request(self):
379
        """See SmartClientMedium.get_request().
380
381
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
382
        for get_request.
383
        """
384
        return SmartClientStreamMediumRequest(self)
385
386
    def read_bytes(self, count):
387
        return self._read_bytes(count)
388
389
390
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
391
    """A client medium using simple pipes.
392
    
393
    This client does not manage the pipes: it assumes they will always be open.
394
    """
395
396
    def __init__(self, readable_pipe, writeable_pipe):
397
        SmartClientStreamMedium.__init__(self)
398
        self._readable_pipe = readable_pipe
399
        self._writeable_pipe = writeable_pipe
400
401
    def _accept_bytes(self, bytes):
402
        """See SmartClientStreamMedium.accept_bytes."""
403
        self._writeable_pipe.write(bytes)
404
405
    def _flush(self):
406
        """See SmartClientStreamMedium._flush()."""
407
        self._writeable_pipe.flush()
408
409
    def _read_bytes(self, count):
410
        """See SmartClientStreamMedium._read_bytes."""
411
        return self._readable_pipe.read(count)
412
413
414
class SmartSSHClientMedium(SmartClientStreamMedium):
415
    """A client medium using SSH."""
416
    
417
    def __init__(self, host, port=None, username=None, password=None,
418
            vendor=None):
419
        """Creates a client that will connect on the first use.
420
        
421
        :param vendor: An optional override for the ssh vendor to use. See
422
            bzrlib.transport.ssh for details on ssh vendors.
423
        """
424
        SmartClientStreamMedium.__init__(self)
425
        self._connected = False
426
        self._host = host
427
        self._password = password
428
        self._port = port
429
        self._username = username
430
        self._read_from = None
431
        self._ssh_connection = None
432
        self._vendor = vendor
433
        self._write_to = None
434
435
    def _accept_bytes(self, bytes):
436
        """See SmartClientStreamMedium.accept_bytes."""
437
        self._ensure_connection()
438
        self._write_to.write(bytes)
439
440
    def disconnect(self):
441
        """See SmartClientMedium.disconnect()."""
442
        if not self._connected:
443
            return
444
        self._read_from.close()
445
        self._write_to.close()
446
        self._ssh_connection.close()
447
        self._connected = False
448
449
    def _ensure_connection(self):
450
        """Connect this medium if not already connected."""
451
        if self._connected:
452
            return
453
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
454
        if self._vendor is None:
455
            vendor = ssh._get_ssh_vendor()
456
        else:
457
            vendor = self._vendor
458
        self._ssh_connection = vendor.connect_ssh(self._username,
459
                self._password, self._host, self._port,
460
                command=[executable, 'serve', '--inet', '--directory=/',
461
                         '--allow-writes'])
462
        self._read_from, self._write_to = \
463
            self._ssh_connection.get_filelike_channels()
464
        self._connected = True
465
466
    def _flush(self):
467
        """See SmartClientStreamMedium._flush()."""
468
        self._write_to.flush()
469
470
    def _read_bytes(self, count):
471
        """See SmartClientStreamMedium.read_bytes."""
472
        if not self._connected:
473
            raise errors.MediumNotConnected(self)
474
        return self._read_from.read(count)
475
476
477
class SmartTCPClientMedium(SmartClientStreamMedium):
478
    """A client medium using TCP."""
479
    
480
    def __init__(self, host, port):
481
        """Creates a client that will connect on the first use."""
482
        SmartClientStreamMedium.__init__(self)
483
        self._connected = False
484
        self._host = host
485
        self._port = port
486
        self._socket = None
487
488
    def _accept_bytes(self, bytes):
489
        """See SmartClientMedium.accept_bytes."""
490
        self._ensure_connection()
491
        self._socket.sendall(bytes)
492
493
    def disconnect(self):
494
        """See SmartClientMedium.disconnect()."""
495
        if not self._connected:
496
            return
497
        self._socket.close()
498
        self._socket = None
499
        self._connected = False
500
501
    def _ensure_connection(self):
502
        """Connect this medium if not already connected."""
503
        if self._connected:
504
            return
505
        self._socket = socket.socket()
506
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
507
        result = self._socket.connect_ex((self._host, int(self._port)))
508
        if result:
509
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
510
                    (self._host, self._port, os.strerror(result)))
511
        self._connected = True
512
513
    def _flush(self):
514
        """See SmartClientStreamMedium._flush().
515
        
516
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
517
        add a means to do a flush, but that can be done in the future.
518
        """
519
520
    def _read_bytes(self, count):
521
        """See SmartClientMedium.read_bytes."""
522
        if not self._connected:
523
            raise errors.MediumNotConnected(self)
524
        return self._socket.recv(count)
525