/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Andrew Bennetts
  • Date: 2007-04-10 02:22:55 UTC
  • mto: This revision was merged to the branch mainline in revision 2402.
  • Revision ID: andrew.bennetts@canonical.com-20070410022255-e1dhysj2zhukca5c
Add some missing docstrings and copyright boilerplate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006,2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""The 'medium' layer for the smart servers and clients.
 
18
 
 
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
 
20
for "the quality between big and small."
 
21
 
 
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
 
23
over SSH), and pass them to and from the protocol logic.  See the overview in
 
24
bzrlib/transport/smart/__init__.py.
 
25
"""
 
26
 
 
27
import os
 
28
import socket
 
29
import sys
 
30
from bzrlib import errors
 
31
from bzrlib.smart.protocol import SmartServerRequestProtocolOne
 
32
 
 
33
try:
 
34
    from bzrlib.transport import ssh
 
35
except errors.ParamikoNotPresent:
 
36
    # no paramiko.  SmartSSHClientMedium will break.
 
37
    pass
 
38
 
 
39
 
 
40
class SmartClientMediumRequest(object):
 
41
    """A request on a SmartClientMedium.
 
42
 
 
43
    Each request allows bytes to be provided to it via accept_bytes, and then
 
44
    the response bytes to be read via read_bytes.
 
45
 
 
46
    For instance:
 
47
    request.accept_bytes('123')
 
48
    request.finished_writing()
 
49
    result = request.read_bytes(3)
 
50
    request.finished_reading()
 
51
 
 
52
    It is up to the individual SmartClientMedium whether multiple concurrent
 
53
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
54
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
55
    details on concurrency and pipelining.
 
56
    """
 
57
 
 
58
    def __init__(self, medium):
 
59
        """Construct a SmartClientMediumRequest for the medium medium."""
 
60
        self._medium = medium
 
61
        # we track state by constants - we may want to use the same
 
62
        # pattern as BodyReader if it gets more complex.
 
63
        # valid states are: "writing", "reading", "done"
 
64
        self._state = "writing"
 
65
 
 
66
    def accept_bytes(self, bytes):
 
67
        """Accept bytes for inclusion in this request.
 
68
 
 
69
        This method may not be be called after finished_writing() has been
 
70
        called.  It depends upon the Medium whether or not the bytes will be
 
71
        immediately transmitted. Message based Mediums will tend to buffer the
 
72
        bytes until finished_writing() is called.
 
73
 
 
74
        :param bytes: A bytestring.
 
75
        """
 
76
        if self._state != "writing":
 
77
            raise errors.WritingCompleted(self)
 
78
        self._accept_bytes(bytes)
 
79
 
 
80
    def _accept_bytes(self, bytes):
 
81
        """Helper for accept_bytes.
 
82
 
 
83
        Accept_bytes checks the state of the request to determing if bytes
 
84
        should be accepted. After that it hands off to _accept_bytes to do the
 
85
        actual acceptance.
 
86
        """
 
87
        raise NotImplementedError(self._accept_bytes)
 
88
 
 
89
    def finished_reading(self):
 
90
        """Inform the request that all desired data has been read.
 
91
 
 
92
        This will remove the request from the pipeline for its medium (if the
 
93
        medium supports pipelining) and any further calls to methods on the
 
94
        request will raise ReadingCompleted.
 
95
        """
 
96
        if self._state == "writing":
 
97
            raise errors.WritingNotComplete(self)
 
98
        if self._state != "reading":
 
99
            raise errors.ReadingCompleted(self)
 
100
        self._state = "done"
 
101
        self._finished_reading()
 
102
 
 
103
    def _finished_reading(self):
 
104
        """Helper for finished_reading.
 
105
 
 
106
        finished_reading checks the state of the request to determine if 
 
107
        finished_reading is allowed, and if it is hands off to _finished_reading
 
108
        to perform the action.
 
109
        """
 
110
        raise NotImplementedError(self._finished_reading)
 
111
 
 
112
    def finished_writing(self):
 
113
        """Finish the writing phase of this request.
 
114
 
 
115
        This will flush all pending data for this request along the medium.
 
116
        After calling finished_writing, you may not call accept_bytes anymore.
 
117
        """
 
118
        if self._state != "writing":
 
119
            raise errors.WritingCompleted(self)
 
120
        self._state = "reading"
 
121
        self._finished_writing()
 
122
 
 
123
    def _finished_writing(self):
 
124
        """Helper for finished_writing.
 
125
 
 
126
        finished_writing checks the state of the request to determine if 
 
127
        finished_writing is allowed, and if it is hands off to _finished_writing
 
128
        to perform the action.
 
129
        """
 
130
        raise NotImplementedError(self._finished_writing)
 
131
 
 
132
    def read_bytes(self, count):
 
133
        """Read bytes from this requests response.
 
134
 
 
135
        This method will block and wait for count bytes to be read. It may not
 
136
        be invoked until finished_writing() has been called - this is to ensure
 
137
        a message-based approach to requests, for compatability with message
 
138
        based mediums like HTTP.
 
139
        """
 
140
        if self._state == "writing":
 
141
            raise errors.WritingNotComplete(self)
 
142
        if self._state != "reading":
 
143
            raise errors.ReadingCompleted(self)
 
144
        return self._read_bytes(count)
 
145
 
 
146
    def _read_bytes(self, count):
 
147
        """Helper for read_bytes.
 
148
 
 
149
        read_bytes checks the state of the request to determing if bytes
 
150
        should be read. After that it hands off to _read_bytes to do the
 
151
        actual read.
 
152
        """
 
153
        raise NotImplementedError(self._read_bytes)
 
154
 
 
155
 
 
156
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
157
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
158
 
 
159
    def __init__(self, medium):
 
160
        SmartClientMediumRequest.__init__(self, medium)
 
161
        # check that we are safe concurrency wise. If some streams start
 
162
        # allowing concurrent requests - i.e. via multiplexing - then this
 
163
        # assert should be moved to SmartClientStreamMedium.get_request,
 
164
        # and the setting/unsetting of _current_request likewise moved into
 
165
        # that class : but its unneeded overhead for now. RBC 20060922
 
166
        if self._medium._current_request is not None:
 
167
            raise errors.TooManyConcurrentRequests(self._medium)
 
168
        self._medium._current_request = self
 
169
 
 
170
    def _accept_bytes(self, bytes):
 
171
        """See SmartClientMediumRequest._accept_bytes.
 
172
        
 
173
        This forwards to self._medium._accept_bytes because we are operating
 
174
        on the mediums stream.
 
175
        """
 
176
        self._medium._accept_bytes(bytes)
 
177
 
 
178
    def _finished_reading(self):
 
179
        """See SmartClientMediumRequest._finished_reading.
 
180
 
 
181
        This clears the _current_request on self._medium to allow a new 
 
182
        request to be created.
 
183
        """
 
184
        assert self._medium._current_request is self
 
185
        self._medium._current_request = None
 
186
        
 
187
    def _finished_writing(self):
 
188
        """See SmartClientMediumRequest._finished_writing.
 
189
 
 
190
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
191
        """
 
192
        self._medium._flush()
 
193
 
 
194
    def _read_bytes(self, count):
 
195
        """See SmartClientMediumRequest._read_bytes.
 
196
        
 
197
        This forwards to self._medium._read_bytes because we are operating
 
198
        on the mediums stream.
 
199
        """
 
200
        return self._medium._read_bytes(count)
 
201
 
 
202
 
 
203
class SmartServerStreamMedium(object):
 
204
    """Handles smart commands coming over a stream.
 
205
 
 
206
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
207
    in-process fifo for testing.
 
208
 
 
209
    One instance is created for each connected client; it can serve multiple
 
210
    requests in the lifetime of the connection.
 
211
 
 
212
    The server passes requests through to an underlying backing transport, 
 
213
    which will typically be a LocalTransport looking at the server's filesystem.
 
214
    """
 
215
 
 
216
    def __init__(self, backing_transport):
 
217
        """Construct new server.
 
218
 
 
219
        :param backing_transport: Transport for the directory served.
 
220
        """
 
221
        # backing_transport could be passed to serve instead of __init__
 
222
        self.backing_transport = backing_transport
 
223
        self.finished = False
 
224
 
 
225
    def serve(self):
 
226
        """Serve requests until the client disconnects."""
 
227
        # Keep a reference to stderr because the sys module's globals get set to
 
228
        # None during interpreter shutdown.
 
229
        from sys import stderr
 
230
        try:
 
231
            while not self.finished:
 
232
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
233
                                                         self._write_out)
 
234
                self._serve_one_request(protocol)
 
235
        except Exception, e:
 
236
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
237
            raise
 
238
 
 
239
    def _serve_one_request(self, protocol):
 
240
        """Read one request from input, process, send back a response.
 
241
        
 
242
        :param protocol: a SmartServerRequestProtocol.
 
243
        """
 
244
        try:
 
245
            self._serve_one_request_unguarded(protocol)
 
246
        except KeyboardInterrupt:
 
247
            raise
 
248
        except Exception, e:
 
249
            self.terminate_due_to_error()
 
250
 
 
251
    def terminate_due_to_error(self):
 
252
        """Called when an unhandled exception from the protocol occurs."""
 
253
        raise NotImplementedError(self.terminate_due_to_error)
 
254
 
 
255
 
 
256
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
257
 
 
258
    def __init__(self, sock, backing_transport):
 
259
        """Constructor.
 
260
 
 
261
        :param sock: the socket the server will read from.  It will be put
 
262
            into blocking mode.
 
263
        """
 
264
        SmartServerStreamMedium.__init__(self, backing_transport)
 
265
        self.push_back = ''
 
266
        sock.setblocking(True)
 
267
        self.socket = sock
 
268
 
 
269
    def _serve_one_request_unguarded(self, protocol):
 
270
        while protocol.next_read_size():
 
271
            if self.push_back:
 
272
                protocol.accept_bytes(self.push_back)
 
273
                self.push_back = ''
 
274
            else:
 
275
                bytes = self.socket.recv(4096)
 
276
                if bytes == '':
 
277
                    self.finished = True
 
278
                    return
 
279
                protocol.accept_bytes(bytes)
 
280
        
 
281
        self.push_back = protocol.excess_buffer
 
282
    
 
283
    def terminate_due_to_error(self):
 
284
        """Called when an unhandled exception from the protocol occurs."""
 
285
        # TODO: This should log to a server log file, but no such thing
 
286
        # exists yet.  Andrew Bennetts 2006-09-29.
 
287
        self.socket.close()
 
288
        self.finished = True
 
289
 
 
290
    def _write_out(self, bytes):
 
291
        self.socket.sendall(bytes)
 
292
 
 
293
 
 
294
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
295
 
 
296
    def __init__(self, in_file, out_file, backing_transport):
 
297
        """Construct new server.
 
298
 
 
299
        :param in_file: Python file from which requests can be read.
 
300
        :param out_file: Python file to write responses.
 
301
        :param backing_transport: Transport for the directory served.
 
302
        """
 
303
        SmartServerStreamMedium.__init__(self, backing_transport)
 
304
        if sys.platform == 'win32':
 
305
            # force binary mode for files
 
306
            import msvcrt
 
307
            for f in (in_file, out_file):
 
308
                fileno = getattr(f, 'fileno', None)
 
309
                if fileno:
 
310
                    msvcrt.setmode(fileno(), os.O_BINARY)
 
311
        self._in = in_file
 
312
        self._out = out_file
 
313
 
 
314
    def _serve_one_request_unguarded(self, protocol):
 
315
        while True:
 
316
            bytes_to_read = protocol.next_read_size()
 
317
            if bytes_to_read == 0:
 
318
                # Finished serving this request.
 
319
                self._out.flush()
 
320
                return
 
321
            bytes = self._in.read(bytes_to_read)
 
322
            if bytes == '':
 
323
                # Connection has been closed.
 
324
                self.finished = True
 
325
                self._out.flush()
 
326
                return
 
327
            protocol.accept_bytes(bytes)
 
328
 
 
329
    def terminate_due_to_error(self):
 
330
        # TODO: This should log to a server log file, but no such thing
 
331
        # exists yet.  Andrew Bennetts 2006-09-29.
 
332
        self._out.close()
 
333
        self.finished = True
 
334
 
 
335
    def _write_out(self, bytes):
 
336
        self._out.write(bytes)
 
337
 
 
338
 
 
339
class SmartClientMedium(object):
 
340
    """Smart client is a medium for sending smart protocol requests over."""
 
341
 
 
342
    def disconnect(self):
 
343
        """If this medium maintains a persistent connection, close it.
 
344
        
 
345
        The default implementation does nothing.
 
346
        """
 
347
        
 
348
 
 
349
class SmartClientStreamMedium(SmartClientMedium):
 
350
    """Stream based medium common class.
 
351
 
 
352
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
353
    SmartClientStreamMediumRequest for their requests, and should implement
 
354
    _accept_bytes and _read_bytes to allow the request objects to send and
 
355
    receive bytes.
 
356
    """
 
357
 
 
358
    def __init__(self):
 
359
        self._current_request = None
 
360
 
 
361
    def accept_bytes(self, bytes):
 
362
        self._accept_bytes(bytes)
 
363
 
 
364
    def __del__(self):
 
365
        """The SmartClientStreamMedium knows how to close the stream when it is
 
366
        finished with it.
 
367
        """
 
368
        self.disconnect()
 
369
 
 
370
    def _flush(self):
 
371
        """Flush the output stream.
 
372
        
 
373
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
374
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
375
        """
 
376
        raise NotImplementedError(self._flush)
 
377
 
 
378
    def get_request(self):
 
379
        """See SmartClientMedium.get_request().
 
380
 
 
381
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
382
        for get_request.
 
383
        """
 
384
        return SmartClientStreamMediumRequest(self)
 
385
 
 
386
    def read_bytes(self, count):
 
387
        return self._read_bytes(count)
 
388
 
 
389
 
 
390
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
391
    """A client medium using simple pipes.
 
392
    
 
393
    This client does not manage the pipes: it assumes they will always be open.
 
394
    """
 
395
 
 
396
    def __init__(self, readable_pipe, writeable_pipe):
 
397
        SmartClientStreamMedium.__init__(self)
 
398
        self._readable_pipe = readable_pipe
 
399
        self._writeable_pipe = writeable_pipe
 
400
 
 
401
    def _accept_bytes(self, bytes):
 
402
        """See SmartClientStreamMedium.accept_bytes."""
 
403
        self._writeable_pipe.write(bytes)
 
404
 
 
405
    def _flush(self):
 
406
        """See SmartClientStreamMedium._flush()."""
 
407
        self._writeable_pipe.flush()
 
408
 
 
409
    def _read_bytes(self, count):
 
410
        """See SmartClientStreamMedium._read_bytes."""
 
411
        return self._readable_pipe.read(count)
 
412
 
 
413
 
 
414
class SmartSSHClientMedium(SmartClientStreamMedium):
 
415
    """A client medium using SSH."""
 
416
    
 
417
    def __init__(self, host, port=None, username=None, password=None,
 
418
            vendor=None):
 
419
        """Creates a client that will connect on the first use.
 
420
        
 
421
        :param vendor: An optional override for the ssh vendor to use. See
 
422
            bzrlib.transport.ssh for details on ssh vendors.
 
423
        """
 
424
        SmartClientStreamMedium.__init__(self)
 
425
        self._connected = False
 
426
        self._host = host
 
427
        self._password = password
 
428
        self._port = port
 
429
        self._username = username
 
430
        self._read_from = None
 
431
        self._ssh_connection = None
 
432
        self._vendor = vendor
 
433
        self._write_to = None
 
434
 
 
435
    def _accept_bytes(self, bytes):
 
436
        """See SmartClientStreamMedium.accept_bytes."""
 
437
        self._ensure_connection()
 
438
        self._write_to.write(bytes)
 
439
 
 
440
    def disconnect(self):
 
441
        """See SmartClientMedium.disconnect()."""
 
442
        if not self._connected:
 
443
            return
 
444
        self._read_from.close()
 
445
        self._write_to.close()
 
446
        self._ssh_connection.close()
 
447
        self._connected = False
 
448
 
 
449
    def _ensure_connection(self):
 
450
        """Connect this medium if not already connected."""
 
451
        if self._connected:
 
452
            return
 
453
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
454
        if self._vendor is None:
 
455
            vendor = ssh._get_ssh_vendor()
 
456
        else:
 
457
            vendor = self._vendor
 
458
        self._ssh_connection = vendor.connect_ssh(self._username,
 
459
                self._password, self._host, self._port,
 
460
                command=[executable, 'serve', '--inet', '--directory=/',
 
461
                         '--allow-writes'])
 
462
        self._read_from, self._write_to = \
 
463
            self._ssh_connection.get_filelike_channels()
 
464
        self._connected = True
 
465
 
 
466
    def _flush(self):
 
467
        """See SmartClientStreamMedium._flush()."""
 
468
        self._write_to.flush()
 
469
 
 
470
    def _read_bytes(self, count):
 
471
        """See SmartClientStreamMedium.read_bytes."""
 
472
        if not self._connected:
 
473
            raise errors.MediumNotConnected(self)
 
474
        return self._read_from.read(count)
 
475
 
 
476
 
 
477
class SmartTCPClientMedium(SmartClientStreamMedium):
 
478
    """A client medium using TCP."""
 
479
    
 
480
    def __init__(self, host, port):
 
481
        """Creates a client that will connect on the first use."""
 
482
        SmartClientStreamMedium.__init__(self)
 
483
        self._connected = False
 
484
        self._host = host
 
485
        self._port = port
 
486
        self._socket = None
 
487
 
 
488
    def _accept_bytes(self, bytes):
 
489
        """See SmartClientMedium.accept_bytes."""
 
490
        self._ensure_connection()
 
491
        self._socket.sendall(bytes)
 
492
 
 
493
    def disconnect(self):
 
494
        """See SmartClientMedium.disconnect()."""
 
495
        if not self._connected:
 
496
            return
 
497
        self._socket.close()
 
498
        self._socket = None
 
499
        self._connected = False
 
500
 
 
501
    def _ensure_connection(self):
 
502
        """Connect this medium if not already connected."""
 
503
        if self._connected:
 
504
            return
 
505
        self._socket = socket.socket()
 
506
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
507
        result = self._socket.connect_ex((self._host, int(self._port)))
 
508
        if result:
 
509
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
510
                    (self._host, self._port, os.strerror(result)))
 
511
        self._connected = True
 
512
 
 
513
    def _flush(self):
 
514
        """See SmartClientStreamMedium._flush().
 
515
        
 
516
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
517
        add a means to do a flush, but that can be done in the future.
 
518
        """
 
519
 
 
520
    def _read_bytes(self, count):
 
521
        """See SmartClientMedium.read_bytes."""
 
522
        if not self._connected:
 
523
            raise errors.MediumNotConnected(self)
 
524
        return self._socket.recv(count)
 
525