1
# Copyright (C) 2006,2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""The 'medium' layer for the smart servers and clients.
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
30
from bzrlib import errors
31
from bzrlib.smart.protocol import SmartServerRequestProtocolOne
34
from bzrlib.transport import ssh
35
except errors.ParamikoNotPresent:
36
# no paramiko. SmartSSHClientMedium will break.
40
class SmartClientMediumRequest(object):
41
"""A request on a SmartClientMedium.
43
Each request allows bytes to be provided to it via accept_bytes, and then
44
the response bytes to be read via read_bytes.
47
request.accept_bytes('123')
48
request.finished_writing()
49
result = request.read_bytes(3)
50
request.finished_reading()
52
It is up to the individual SmartClientMedium whether multiple concurrent
53
requests can exist. See SmartClientMedium.get_request to obtain instances
54
of SmartClientMediumRequest, and the concrete Medium you are using for
55
details on concurrency and pipelining.
58
def __init__(self, medium):
59
"""Construct a SmartClientMediumRequest for the medium medium."""
61
# we track state by constants - we may want to use the same
62
# pattern as BodyReader if it gets more complex.
63
# valid states are: "writing", "reading", "done"
64
self._state = "writing"
66
def accept_bytes(self, bytes):
67
"""Accept bytes for inclusion in this request.
69
This method may not be be called after finished_writing() has been
70
called. It depends upon the Medium whether or not the bytes will be
71
immediately transmitted. Message based Mediums will tend to buffer the
72
bytes until finished_writing() is called.
74
:param bytes: A bytestring.
76
if self._state != "writing":
77
raise errors.WritingCompleted(self)
78
self._accept_bytes(bytes)
80
def _accept_bytes(self, bytes):
81
"""Helper for accept_bytes.
83
Accept_bytes checks the state of the request to determing if bytes
84
should be accepted. After that it hands off to _accept_bytes to do the
87
raise NotImplementedError(self._accept_bytes)
89
def finished_reading(self):
90
"""Inform the request that all desired data has been read.
92
This will remove the request from the pipeline for its medium (if the
93
medium supports pipelining) and any further calls to methods on the
94
request will raise ReadingCompleted.
96
if self._state == "writing":
97
raise errors.WritingNotComplete(self)
98
if self._state != "reading":
99
raise errors.ReadingCompleted(self)
101
self._finished_reading()
103
def _finished_reading(self):
104
"""Helper for finished_reading.
106
finished_reading checks the state of the request to determine if
107
finished_reading is allowed, and if it is hands off to _finished_reading
108
to perform the action.
110
raise NotImplementedError(self._finished_reading)
112
def finished_writing(self):
113
"""Finish the writing phase of this request.
115
This will flush all pending data for this request along the medium.
116
After calling finished_writing, you may not call accept_bytes anymore.
118
if self._state != "writing":
119
raise errors.WritingCompleted(self)
120
self._state = "reading"
121
self._finished_writing()
123
def _finished_writing(self):
124
"""Helper for finished_writing.
126
finished_writing checks the state of the request to determine if
127
finished_writing is allowed, and if it is hands off to _finished_writing
128
to perform the action.
130
raise NotImplementedError(self._finished_writing)
132
def read_bytes(self, count):
133
"""Read bytes from this requests response.
135
This method will block and wait for count bytes to be read. It may not
136
be invoked until finished_writing() has been called - this is to ensure
137
a message-based approach to requests, for compatability with message
138
based mediums like HTTP.
140
if self._state == "writing":
141
raise errors.WritingNotComplete(self)
142
if self._state != "reading":
143
raise errors.ReadingCompleted(self)
144
return self._read_bytes(count)
146
def _read_bytes(self, count):
147
"""Helper for read_bytes.
149
read_bytes checks the state of the request to determing if bytes
150
should be read. After that it hands off to _read_bytes to do the
153
raise NotImplementedError(self._read_bytes)
156
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
157
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
159
def __init__(self, medium):
160
SmartClientMediumRequest.__init__(self, medium)
161
# check that we are safe concurrency wise. If some streams start
162
# allowing concurrent requests - i.e. via multiplexing - then this
163
# assert should be moved to SmartClientStreamMedium.get_request,
164
# and the setting/unsetting of _current_request likewise moved into
165
# that class : but its unneeded overhead for now. RBC 20060922
166
if self._medium._current_request is not None:
167
raise errors.TooManyConcurrentRequests(self._medium)
168
self._medium._current_request = self
170
def _accept_bytes(self, bytes):
171
"""See SmartClientMediumRequest._accept_bytes.
173
This forwards to self._medium._accept_bytes because we are operating
174
on the mediums stream.
176
self._medium._accept_bytes(bytes)
178
def _finished_reading(self):
179
"""See SmartClientMediumRequest._finished_reading.
181
This clears the _current_request on self._medium to allow a new
182
request to be created.
184
assert self._medium._current_request is self
185
self._medium._current_request = None
187
def _finished_writing(self):
188
"""See SmartClientMediumRequest._finished_writing.
190
This invokes self._medium._flush to ensure all bytes are transmitted.
192
self._medium._flush()
194
def _read_bytes(self, count):
195
"""See SmartClientMediumRequest._read_bytes.
197
This forwards to self._medium._read_bytes because we are operating
198
on the mediums stream.
200
return self._medium._read_bytes(count)
203
class SmartServerStreamMedium(object):
204
"""Handles smart commands coming over a stream.
206
The stream may be a pipe connected to sshd, or a tcp socket, or an
207
in-process fifo for testing.
209
One instance is created for each connected client; it can serve multiple
210
requests in the lifetime of the connection.
212
The server passes requests through to an underlying backing transport,
213
which will typically be a LocalTransport looking at the server's filesystem.
216
def __init__(self, backing_transport):
217
"""Construct new server.
219
:param backing_transport: Transport for the directory served.
221
# backing_transport could be passed to serve instead of __init__
222
self.backing_transport = backing_transport
223
self.finished = False
226
"""Serve requests until the client disconnects."""
227
# Keep a reference to stderr because the sys module's globals get set to
228
# None during interpreter shutdown.
229
from sys import stderr
231
while not self.finished:
232
protocol = SmartServerRequestProtocolOne(self.backing_transport,
234
self._serve_one_request(protocol)
236
stderr.write("%s terminating on exception %s\n" % (self, e))
239
def _serve_one_request(self, protocol):
240
"""Read one request from input, process, send back a response.
242
:param protocol: a SmartServerRequestProtocol.
245
self._serve_one_request_unguarded(protocol)
246
except KeyboardInterrupt:
249
self.terminate_due_to_error()
251
def terminate_due_to_error(self):
252
"""Called when an unhandled exception from the protocol occurs."""
253
raise NotImplementedError(self.terminate_due_to_error)
256
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
258
def __init__(self, sock, backing_transport):
261
:param sock: the socket the server will read from. It will be put
264
SmartServerStreamMedium.__init__(self, backing_transport)
266
sock.setblocking(True)
269
def _serve_one_request_unguarded(self, protocol):
270
while protocol.next_read_size():
272
protocol.accept_bytes(self.push_back)
275
bytes = self.socket.recv(4096)
279
protocol.accept_bytes(bytes)
281
self.push_back = protocol.excess_buffer
283
def terminate_due_to_error(self):
284
"""Called when an unhandled exception from the protocol occurs."""
285
# TODO: This should log to a server log file, but no such thing
286
# exists yet. Andrew Bennetts 2006-09-29.
290
def _write_out(self, bytes):
291
self.socket.sendall(bytes)
294
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
296
def __init__(self, in_file, out_file, backing_transport):
297
"""Construct new server.
299
:param in_file: Python file from which requests can be read.
300
:param out_file: Python file to write responses.
301
:param backing_transport: Transport for the directory served.
303
SmartServerStreamMedium.__init__(self, backing_transport)
304
if sys.platform == 'win32':
305
# force binary mode for files
307
for f in (in_file, out_file):
308
fileno = getattr(f, 'fileno', None)
310
msvcrt.setmode(fileno(), os.O_BINARY)
314
def _serve_one_request_unguarded(self, protocol):
316
bytes_to_read = protocol.next_read_size()
317
if bytes_to_read == 0:
318
# Finished serving this request.
321
bytes = self._in.read(bytes_to_read)
323
# Connection has been closed.
327
protocol.accept_bytes(bytes)
329
def terminate_due_to_error(self):
330
# TODO: This should log to a server log file, but no such thing
331
# exists yet. Andrew Bennetts 2006-09-29.
335
def _write_out(self, bytes):
336
self._out.write(bytes)
339
class SmartClientMedium(object):
340
"""Smart client is a medium for sending smart protocol requests over."""
342
def disconnect(self):
343
"""If this medium maintains a persistent connection, close it.
345
The default implementation does nothing.
349
class SmartClientStreamMedium(SmartClientMedium):
350
"""Stream based medium common class.
352
SmartClientStreamMediums operate on a stream. All subclasses use a common
353
SmartClientStreamMediumRequest for their requests, and should implement
354
_accept_bytes and _read_bytes to allow the request objects to send and
359
self._current_request = None
361
def accept_bytes(self, bytes):
362
self._accept_bytes(bytes)
365
"""The SmartClientStreamMedium knows how to close the stream when it is
371
"""Flush the output stream.
373
This method is used by the SmartClientStreamMediumRequest to ensure that
374
all data for a request is sent, to avoid long timeouts or deadlocks.
376
raise NotImplementedError(self._flush)
378
def get_request(self):
379
"""See SmartClientMedium.get_request().
381
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
384
return SmartClientStreamMediumRequest(self)
386
def read_bytes(self, count):
387
return self._read_bytes(count)
390
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
391
"""A client medium using simple pipes.
393
This client does not manage the pipes: it assumes they will always be open.
396
def __init__(self, readable_pipe, writeable_pipe):
397
SmartClientStreamMedium.__init__(self)
398
self._readable_pipe = readable_pipe
399
self._writeable_pipe = writeable_pipe
401
def _accept_bytes(self, bytes):
402
"""See SmartClientStreamMedium.accept_bytes."""
403
self._writeable_pipe.write(bytes)
406
"""See SmartClientStreamMedium._flush()."""
407
self._writeable_pipe.flush()
409
def _read_bytes(self, count):
410
"""See SmartClientStreamMedium._read_bytes."""
411
return self._readable_pipe.read(count)
414
class SmartSSHClientMedium(SmartClientStreamMedium):
415
"""A client medium using SSH."""
417
def __init__(self, host, port=None, username=None, password=None,
419
"""Creates a client that will connect on the first use.
421
:param vendor: An optional override for the ssh vendor to use. See
422
bzrlib.transport.ssh for details on ssh vendors.
424
SmartClientStreamMedium.__init__(self)
425
self._connected = False
427
self._password = password
429
self._username = username
430
self._read_from = None
431
self._ssh_connection = None
432
self._vendor = vendor
433
self._write_to = None
435
def _accept_bytes(self, bytes):
436
"""See SmartClientStreamMedium.accept_bytes."""
437
self._ensure_connection()
438
self._write_to.write(bytes)
440
def disconnect(self):
441
"""See SmartClientMedium.disconnect()."""
442
if not self._connected:
444
self._read_from.close()
445
self._write_to.close()
446
self._ssh_connection.close()
447
self._connected = False
449
def _ensure_connection(self):
450
"""Connect this medium if not already connected."""
453
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
454
if self._vendor is None:
455
vendor = ssh._get_ssh_vendor()
457
vendor = self._vendor
458
self._ssh_connection = vendor.connect_ssh(self._username,
459
self._password, self._host, self._port,
460
command=[executable, 'serve', '--inet', '--directory=/',
462
self._read_from, self._write_to = \
463
self._ssh_connection.get_filelike_channels()
464
self._connected = True
467
"""See SmartClientStreamMedium._flush()."""
468
self._write_to.flush()
470
def _read_bytes(self, count):
471
"""See SmartClientStreamMedium.read_bytes."""
472
if not self._connected:
473
raise errors.MediumNotConnected(self)
474
return self._read_from.read(count)
477
class SmartTCPClientMedium(SmartClientStreamMedium):
478
"""A client medium using TCP."""
480
def __init__(self, host, port):
481
"""Creates a client that will connect on the first use."""
482
SmartClientStreamMedium.__init__(self)
483
self._connected = False
488
def _accept_bytes(self, bytes):
489
"""See SmartClientMedium.accept_bytes."""
490
self._ensure_connection()
491
self._socket.sendall(bytes)
493
def disconnect(self):
494
"""See SmartClientMedium.disconnect()."""
495
if not self._connected:
499
self._connected = False
501
def _ensure_connection(self):
502
"""Connect this medium if not already connected."""
505
self._socket = socket.socket()
506
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
507
result = self._socket.connect_ex((self._host, int(self._port)))
509
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
510
(self._host, self._port, os.strerror(result)))
511
self._connected = True
514
"""See SmartClientStreamMedium._flush().
516
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
517
add a means to do a flush, but that can be done in the future.
520
def _read_bytes(self, count):
521
"""See SmartClientMedium.read_bytes."""
522
if not self._connected:
523
raise errors.MediumNotConnected(self)
524
return self._socket.recv(count)