1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""The 'medium' layer for the smart servers and clients.
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
36
from bzrlib.smart.protocol import (
37
MESSAGE_VERSION_THREE,
39
SmartClientRequestProtocolOne,
40
SmartServerRequestProtocolOne,
41
SmartServerRequestProtocolTwo,
42
build_server_protocol_three
44
from bzrlib.transport import ssh
47
def _get_protocol_factory_for_bytes(bytes):
48
"""Determine the right protocol factory for 'bytes'.
50
This will return an appropriate protocol factory depending on the version
51
of the protocol being used, as determined by inspecting the given bytes.
52
The bytes should have at least one newline byte (i.e. be a whole line),
53
otherwise it's possible that a request will be incorrectly identified as
56
Typical use would be::
58
factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
59
server_protocol = factory(transport, write_func, root_client_path)
60
server_protocol.accept_bytes(unused_bytes)
62
:param bytes: a str of bytes of the start of the request.
63
:returns: 2-tuple of (protocol_factory, unused_bytes). protocol_factory is
64
a callable that takes three args: transport, write_func,
65
root_client_path. unused_bytes are any bytes that were not part of a
66
protocol version marker.
68
if bytes.startswith(MESSAGE_VERSION_THREE):
69
protocol_factory = build_server_protocol_three
70
bytes = bytes[len(MESSAGE_VERSION_THREE):]
71
elif bytes.startswith(REQUEST_VERSION_TWO):
72
protocol_factory = SmartServerRequestProtocolTwo
73
bytes = bytes[len(REQUEST_VERSION_TWO):]
75
protocol_factory = SmartServerRequestProtocolOne
76
return protocol_factory, bytes
79
class SmartServerStreamMedium(object):
80
"""Handles smart commands coming over a stream.
82
The stream may be a pipe connected to sshd, or a tcp socket, or an
83
in-process fifo for testing.
85
One instance is created for each connected client; it can serve multiple
86
requests in the lifetime of the connection.
88
The server passes requests through to an underlying backing transport,
89
which will typically be a LocalTransport looking at the server's filesystem.
91
:ivar _push_back_buffer: a str of bytes that have been read from the stream
92
but not used yet, or None if there are no buffered bytes. Subclasses
93
should make sure to exhaust this buffer before reading more bytes from
94
the stream. See also the _push_back method.
97
def __init__(self, backing_transport, root_client_path='/'):
98
"""Construct new server.
100
:param backing_transport: Transport for the directory served.
102
# backing_transport could be passed to serve instead of __init__
103
self.backing_transport = backing_transport
104
self.root_client_path = root_client_path
105
self.finished = False
106
self._push_back_buffer = None
108
def _push_back(self, bytes):
109
"""Return unused bytes to the medium, because they belong to the next
112
This sets the _push_back_buffer to the given bytes.
114
if self._push_back_buffer is not None:
115
raise AssertionError(
116
"_push_back called when self._push_back_buffer is %r"
117
% (self._push_back_buffer,))
120
self._push_back_buffer = bytes
122
def _get_push_back_buffer(self):
123
if self._push_back_buffer == '':
124
raise AssertionError(
125
'%s._push_back_buffer should never be the empty string, '
126
'which can be confused with EOF' % (self,))
127
bytes = self._push_back_buffer
128
self._push_back_buffer = None
132
"""Serve requests until the client disconnects."""
133
# Keep a reference to stderr because the sys module's globals get set to
134
# None during interpreter shutdown.
135
from sys import stderr
137
while not self.finished:
138
server_protocol = self._build_protocol()
139
self._serve_one_request(server_protocol)
141
stderr.write("%s terminating on exception %s\n" % (self, e))
144
def _build_protocol(self):
145
"""Identifies the version of the incoming request, and returns an
146
a protocol object that can interpret it.
148
If more bytes than the version prefix of the request are read, they will
149
be fed into the protocol before it is returned.
151
:returns: a SmartServerRequestProtocol.
153
bytes = self._get_line()
154
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
155
protocol = protocol_factory(
156
self.backing_transport, self._write_out, self.root_client_path)
157
protocol.accept_bytes(unused_bytes)
160
def _serve_one_request(self, protocol):
161
"""Read one request from input, process, send back a response.
163
:param protocol: a SmartServerRequestProtocol.
166
self._serve_one_request_unguarded(protocol)
167
except KeyboardInterrupt:
170
self.terminate_due_to_error()
172
def terminate_due_to_error(self):
173
"""Called when an unhandled exception from the protocol occurs."""
174
raise NotImplementedError(self.terminate_due_to_error)
176
def _get_bytes(self, desired_count):
177
"""Get some bytes from the medium.
179
:param desired_count: number of bytes we want to read.
181
raise NotImplementedError(self._get_bytes)
184
"""Read bytes from this request's response until a newline byte.
186
This isn't particularly efficient, so should only be used when the
187
expected size of the line is quite short.
189
:returns: a string of bytes ending in a newline (byte 0x0A).
193
while newline_pos == -1:
194
new_bytes = self._get_bytes(1)
197
# Ran out of bytes before receiving a complete line.
199
newline_pos = bytes.find('\n')
200
line = bytes[:newline_pos+1]
201
self._push_back(bytes[newline_pos+1:])
205
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
207
def __init__(self, sock, backing_transport, root_client_path='/'):
210
:param sock: the socket the server will read from. It will be put
213
SmartServerStreamMedium.__init__(
214
self, backing_transport, root_client_path=root_client_path)
215
sock.setblocking(True)
218
def _serve_one_request_unguarded(self, protocol):
219
while protocol.next_read_size():
220
bytes = self._get_bytes(4096)
224
protocol.accept_bytes(bytes)
226
self._push_back(protocol.unused_data)
228
def _get_bytes(self, desired_count):
229
if self._push_back_buffer is not None:
230
return self._get_push_back_buffer()
231
# We ignore the desired_count because on sockets it's more efficient to
233
return self.socket.recv(4096)
235
def terminate_due_to_error(self):
236
# TODO: This should log to a server log file, but no such thing
237
# exists yet. Andrew Bennetts 2006-09-29.
241
def _write_out(self, bytes):
242
osutils.send_all(self.socket, bytes)
245
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
247
def __init__(self, in_file, out_file, backing_transport):
248
"""Construct new server.
250
:param in_file: Python file from which requests can be read.
251
:param out_file: Python file to write responses.
252
:param backing_transport: Transport for the directory served.
254
SmartServerStreamMedium.__init__(self, backing_transport)
255
if sys.platform == 'win32':
256
# force binary mode for files
258
for f in (in_file, out_file):
259
fileno = getattr(f, 'fileno', None)
261
msvcrt.setmode(fileno(), os.O_BINARY)
265
def _serve_one_request_unguarded(self, protocol):
267
bytes_to_read = protocol.next_read_size()
268
if bytes_to_read == 0:
269
# Finished serving this request.
272
bytes = self._get_bytes(bytes_to_read)
274
# Connection has been closed.
278
protocol.accept_bytes(bytes)
280
def _get_bytes(self, desired_count):
281
if self._push_back_buffer is not None:
282
return self._get_push_back_buffer()
283
return self._in.read(desired_count)
285
def terminate_due_to_error(self):
286
# TODO: This should log to a server log file, but no such thing
287
# exists yet. Andrew Bennetts 2006-09-29.
291
def _write_out(self, bytes):
292
self._out.write(bytes)
295
class SmartClientMediumRequest(object):
296
"""A request on a SmartClientMedium.
298
Each request allows bytes to be provided to it via accept_bytes, and then
299
the response bytes to be read via read_bytes.
302
request.accept_bytes('123')
303
request.finished_writing()
304
result = request.read_bytes(3)
305
request.finished_reading()
307
It is up to the individual SmartClientMedium whether multiple concurrent
308
requests can exist. See SmartClientMedium.get_request to obtain instances
309
of SmartClientMediumRequest, and the concrete Medium you are using for
310
details on concurrency and pipelining.
313
def __init__(self, medium):
314
"""Construct a SmartClientMediumRequest for the medium medium."""
315
self._medium = medium
316
# we track state by constants - we may want to use the same
317
# pattern as BodyReader if it gets more complex.
318
# valid states are: "writing", "reading", "done"
319
self._state = "writing"
321
def accept_bytes(self, bytes):
322
"""Accept bytes for inclusion in this request.
324
This method may not be be called after finished_writing() has been
325
called. It depends upon the Medium whether or not the bytes will be
326
immediately transmitted. Message based Mediums will tend to buffer the
327
bytes until finished_writing() is called.
329
:param bytes: A bytestring.
331
if self._state != "writing":
332
raise errors.WritingCompleted(self)
333
self._accept_bytes(bytes)
335
def _accept_bytes(self, bytes):
336
"""Helper for accept_bytes.
338
Accept_bytes checks the state of the request to determing if bytes
339
should be accepted. After that it hands off to _accept_bytes to do the
342
raise NotImplementedError(self._accept_bytes)
344
def finished_reading(self):
345
"""Inform the request that all desired data has been read.
347
This will remove the request from the pipeline for its medium (if the
348
medium supports pipelining) and any further calls to methods on the
349
request will raise ReadingCompleted.
351
if self._state == "writing":
352
raise errors.WritingNotComplete(self)
353
if self._state != "reading":
354
raise errors.ReadingCompleted(self)
356
self._finished_reading()
358
def _finished_reading(self):
359
"""Helper for finished_reading.
361
finished_reading checks the state of the request to determine if
362
finished_reading is allowed, and if it is hands off to _finished_reading
363
to perform the action.
365
raise NotImplementedError(self._finished_reading)
367
def finished_writing(self):
368
"""Finish the writing phase of this request.
370
This will flush all pending data for this request along the medium.
371
After calling finished_writing, you may not call accept_bytes anymore.
373
if self._state != "writing":
374
raise errors.WritingCompleted(self)
375
self._state = "reading"
376
self._finished_writing()
378
def _finished_writing(self):
379
"""Helper for finished_writing.
381
finished_writing checks the state of the request to determine if
382
finished_writing is allowed, and if it is hands off to _finished_writing
383
to perform the action.
385
raise NotImplementedError(self._finished_writing)
387
def read_bytes(self, count):
388
"""Read bytes from this requests response.
390
This method will block and wait for count bytes to be read. It may not
391
be invoked until finished_writing() has been called - this is to ensure
392
a message-based approach to requests, for compatibility with message
393
based mediums like HTTP.
395
if self._state == "writing":
396
raise errors.WritingNotComplete(self)
397
if self._state != "reading":
398
raise errors.ReadingCompleted(self)
399
return self._read_bytes(count)
401
def _read_bytes(self, count):
402
"""Helper for read_bytes.
404
read_bytes checks the state of the request to determing if bytes
405
should be read. After that it hands off to _read_bytes to do the
408
raise NotImplementedError(self._read_bytes)
411
"""Read bytes from this request's response until a newline byte.
413
This isn't particularly efficient, so should only be used when the
414
expected size of the line is quite short.
416
:returns: a string of bytes ending in a newline (byte 0x0A).
418
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
420
while not line or line[-1] != '\n':
421
new_char = self.read_bytes(1)
424
# end of file encountered reading from server
425
raise errors.ConnectionReset(
426
"please check connectivity and permissions",
427
"(and try -Dhpss if further diagnosis is required)")
431
class SmartClientMedium(object):
432
"""Smart client is a medium for sending smart protocol requests over."""
435
super(SmartClientMedium, self).__init__()
436
self._protocol_version_error = None
437
self._protocol_version = None
438
self._done_hello = False
440
def protocol_version(self):
441
"""Find out if 'hello' smart request works."""
442
if self._protocol_version_error is not None:
443
raise self._protocol_version_error
444
if not self._done_hello:
446
medium_request = self.get_request()
447
# Send a 'hello' request in protocol version one, for maximum
448
# backwards compatibility.
449
client_protocol = SmartClientRequestProtocolOne(medium_request)
450
client_protocol.query_version()
451
self._done_hello = True
452
except errors.SmartProtocolError, e:
453
# Cache the error, just like we would cache a successful
455
self._protocol_version_error = e
459
def should_probe(self):
460
"""Should RemoteBzrDirFormat.probe_transport send a smart request on
463
Some transports are unambiguously smart-only; there's no need to check
464
if the transport is able to carry smart requests, because that's all
465
it is for. In those cases, this method should return False.
467
But some HTTP transports can sometimes fail to carry smart requests,
468
but still be usuable for accessing remote bzrdirs via plain file
469
accesses. So for those transports, their media should return True here
470
so that RemoteBzrDirFormat can determine if it is appropriate for that
475
def disconnect(self):
476
"""If this medium maintains a persistent connection, close it.
478
The default implementation does nothing.
482
class SmartClientStreamMedium(SmartClientMedium):
483
"""Stream based medium common class.
485
SmartClientStreamMediums operate on a stream. All subclasses use a common
486
SmartClientStreamMediumRequest for their requests, and should implement
487
_accept_bytes and _read_bytes to allow the request objects to send and
492
SmartClientMedium.__init__(self)
493
self._current_request = None
494
# Be optimistic: we assume the remote end can accept new remote
495
# requests until we get an error saying otherwise. (1.2 adds some
496
# requests that send bodies, which confuses older servers.)
497
self._remote_is_at_least_1_2 = True
499
def accept_bytes(self, bytes):
500
self._accept_bytes(bytes)
503
"""The SmartClientStreamMedium knows how to close the stream when it is
509
"""Flush the output stream.
511
This method is used by the SmartClientStreamMediumRequest to ensure that
512
all data for a request is sent, to avoid long timeouts or deadlocks.
514
raise NotImplementedError(self._flush)
516
def get_request(self):
517
"""See SmartClientMedium.get_request().
519
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
522
return SmartClientStreamMediumRequest(self)
524
def read_bytes(self, count):
525
return self._read_bytes(count)
528
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
529
"""A client medium using simple pipes.
531
This client does not manage the pipes: it assumes they will always be open.
534
def __init__(self, readable_pipe, writeable_pipe):
535
SmartClientStreamMedium.__init__(self)
536
self._readable_pipe = readable_pipe
537
self._writeable_pipe = writeable_pipe
539
def _accept_bytes(self, bytes):
540
"""See SmartClientStreamMedium.accept_bytes."""
541
self._writeable_pipe.write(bytes)
544
"""See SmartClientStreamMedium._flush()."""
545
self._writeable_pipe.flush()
547
def _read_bytes(self, count):
548
"""See SmartClientStreamMedium._read_bytes."""
549
return self._readable_pipe.read(count)
552
class SmartSSHClientMedium(SmartClientStreamMedium):
553
"""A client medium using SSH."""
555
def __init__(self, host, port=None, username=None, password=None,
556
vendor=None, bzr_remote_path=None):
557
"""Creates a client that will connect on the first use.
559
:param vendor: An optional override for the ssh vendor to use. See
560
bzrlib.transport.ssh for details on ssh vendors.
562
SmartClientStreamMedium.__init__(self)
563
self._connected = False
565
self._password = password
567
self._username = username
568
self._read_from = None
569
self._ssh_connection = None
570
self._vendor = vendor
571
self._write_to = None
572
self._bzr_remote_path = bzr_remote_path
573
if self._bzr_remote_path is None:
574
symbol_versioning.warn(
575
'bzr_remote_path is required as of bzr 0.92',
576
DeprecationWarning, stacklevel=2)
577
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
579
def _accept_bytes(self, bytes):
580
"""See SmartClientStreamMedium.accept_bytes."""
581
self._ensure_connection()
582
self._write_to.write(bytes)
584
def disconnect(self):
585
"""See SmartClientMedium.disconnect()."""
586
if not self._connected:
588
self._read_from.close()
589
self._write_to.close()
590
self._ssh_connection.close()
591
self._connected = False
593
def _ensure_connection(self):
594
"""Connect this medium if not already connected."""
597
if self._vendor is None:
598
vendor = ssh._get_ssh_vendor()
600
vendor = self._vendor
601
self._ssh_connection = vendor.connect_ssh(self._username,
602
self._password, self._host, self._port,
603
command=[self._bzr_remote_path, 'serve', '--inet',
604
'--directory=/', '--allow-writes'])
605
self._read_from, self._write_to = \
606
self._ssh_connection.get_filelike_channels()
607
self._connected = True
610
"""See SmartClientStreamMedium._flush()."""
611
self._write_to.flush()
613
def _read_bytes(self, count):
614
"""See SmartClientStreamMedium.read_bytes."""
615
if not self._connected:
616
raise errors.MediumNotConnected(self)
617
return self._read_from.read(count)
620
# Port 4155 is the default port for bzr://, registered with IANA.
621
BZR_DEFAULT_INTERFACE = '0.0.0.0'
622
BZR_DEFAULT_PORT = 4155
625
class SmartTCPClientMedium(SmartClientStreamMedium):
626
"""A client medium using TCP."""
628
def __init__(self, host, port):
629
"""Creates a client that will connect on the first use."""
630
SmartClientStreamMedium.__init__(self)
631
self._connected = False
636
def _accept_bytes(self, bytes):
637
"""See SmartClientMedium.accept_bytes."""
638
self._ensure_connection()
639
osutils.send_all(self._socket, bytes)
641
def disconnect(self):
642
"""See SmartClientMedium.disconnect()."""
643
if not self._connected:
647
self._connected = False
649
def _ensure_connection(self):
650
"""Connect this medium if not already connected."""
653
self._socket = socket.socket()
654
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
655
if self._port is None:
656
port = BZR_DEFAULT_PORT
658
port = int(self._port)
660
self._socket.connect((self._host, port))
661
except socket.error, err:
662
# socket errors either have a (string) or (errno, string) as their
664
if type(err.args) is str:
667
err_msg = err.args[1]
668
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
669
(self._host, port, err_msg))
670
self._connected = True
673
"""See SmartClientStreamMedium._flush().
675
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
676
add a means to do a flush, but that can be done in the future.
679
def _read_bytes(self, count):
680
"""See SmartClientMedium.read_bytes."""
681
if not self._connected:
682
raise errors.MediumNotConnected(self)
683
return self._socket.recv(count)
686
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
687
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
689
def __init__(self, medium):
690
SmartClientMediumRequest.__init__(self, medium)
691
# check that we are safe concurrency wise. If some streams start
692
# allowing concurrent requests - i.e. via multiplexing - then this
693
# assert should be moved to SmartClientStreamMedium.get_request,
694
# and the setting/unsetting of _current_request likewise moved into
695
# that class : but its unneeded overhead for now. RBC 20060922
696
if self._medium._current_request is not None:
697
raise errors.TooManyConcurrentRequests(self._medium)
698
self._medium._current_request = self
700
def _accept_bytes(self, bytes):
701
"""See SmartClientMediumRequest._accept_bytes.
703
This forwards to self._medium._accept_bytes because we are operating
704
on the mediums stream.
706
self._medium._accept_bytes(bytes)
708
def _finished_reading(self):
709
"""See SmartClientMediumRequest._finished_reading.
711
This clears the _current_request on self._medium to allow a new
712
request to be created.
714
if self._medium._current_request is not self:
715
raise AssertionError()
716
self._medium._current_request = None
718
def _finished_writing(self):
719
"""See SmartClientMediumRequest._finished_writing.
721
This invokes self._medium._flush to ensure all bytes are transmitted.
723
self._medium._flush()
725
def _read_bytes(self, count):
726
"""See SmartClientMediumRequest._read_bytes.
728
This forwards to self._medium._read_bytes because we are operating
729
on the mediums stream.
731
return self._medium._read_bytes(count)