1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""The 'medium' layer for the smart servers and clients.
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
36
from bzrlib.smart.protocol import (
37
MESSAGE_VERSION_THREE,
39
SmartClientRequestProtocolOne,
40
SmartServerRequestProtocolOne,
41
SmartServerRequestProtocolTwo,
42
build_server_protocol_three
44
from bzrlib.transport import ssh
47
def _get_protocol_factory_for_bytes(bytes):
48
"""Determine the right protocol factory for 'bytes'.
50
This will return an appropriate protocol factory depending on the version
51
of the protocol being used, as determined by inspecting the given bytes.
52
The bytes should have at least one newline byte (i.e. be a whole line),
53
otherwise it's possible that a request will be incorrectly identified as
56
Typical use would be::
58
factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
59
server_protocol = factory(transport, write_func, root_client_path)
60
server_protocol.accept_bytes(unused_bytes)
62
:param bytes: a str of bytes of the start of the request.
63
:returns: 2-tuple of (protocol_factory, unused_bytes). protocol_factory is
64
a callable that takes three args: transport, write_func,
65
root_client_path. unused_bytes are any bytes that were not part of a
66
protocol version marker.
68
if bytes.startswith(MESSAGE_VERSION_THREE):
69
protocol_factory = build_server_protocol_three
70
bytes = bytes[len(MESSAGE_VERSION_THREE):]
71
elif bytes.startswith(REQUEST_VERSION_TWO):
72
protocol_factory = SmartServerRequestProtocolTwo
73
bytes = bytes[len(REQUEST_VERSION_TWO):]
75
protocol_factory = SmartServerRequestProtocolOne
76
return protocol_factory, bytes
79
class SmartServerStreamMedium(object):
80
"""Handles smart commands coming over a stream.
82
The stream may be a pipe connected to sshd, or a tcp socket, or an
83
in-process fifo for testing.
85
One instance is created for each connected client; it can serve multiple
86
requests in the lifetime of the connection.
88
The server passes requests through to an underlying backing transport,
89
which will typically be a LocalTransport looking at the server's filesystem.
91
:ivar _push_back_buffer: a str of bytes that have been read from the stream
92
but not used yet, or None if there are no buffered bytes. Subclasses
93
should make sure to exhaust this buffer before reading more bytes from
94
the stream. See also the _push_back method.
97
def __init__(self, backing_transport, root_client_path='/'):
98
"""Construct new server.
100
:param backing_transport: Transport for the directory served.
102
# backing_transport could be passed to serve instead of __init__
103
self.backing_transport = backing_transport
104
self.root_client_path = root_client_path
105
self.finished = False
106
self._push_back_buffer = None
108
def _push_back(self, bytes):
109
"""Return unused bytes to the medium, because they belong to the next
112
This sets the _push_back_buffer to the given bytes.
114
if self._push_back_buffer is not None:
115
raise AssertionError(
116
"_push_back called when self._push_back_buffer is %r"
117
% (self._push_back_buffer,))
120
self._push_back_buffer = bytes
122
def _get_push_back_buffer(self):
123
if self._push_back_buffer == '':
124
raise AssertionError(
125
'%s._push_back_buffer should never be the empty string, '
126
'which can be confused with EOF' % (self,))
127
bytes = self._push_back_buffer
128
self._push_back_buffer = None
132
"""Serve requests until the client disconnects."""
133
# Keep a reference to stderr because the sys module's globals get set to
134
# None during interpreter shutdown.
135
from sys import stderr
137
while not self.finished:
138
server_protocol = self._build_protocol()
139
self._serve_one_request(server_protocol)
141
stderr.write("%s terminating on exception %s\n" % (self, e))
144
def _build_protocol(self):
145
"""Identifies the version of the incoming request, and returns an
146
a protocol object that can interpret it.
148
If more bytes than the version prefix of the request are read, they will
149
be fed into the protocol before it is returned.
151
:returns: a SmartServerRequestProtocol.
153
bytes = self._get_line()
154
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
155
protocol = protocol_factory(
156
self.backing_transport, self._write_out, self.root_client_path)
157
protocol.accept_bytes(unused_bytes)
160
def _serve_one_request(self, protocol):
161
"""Read one request from input, process, send back a response.
163
:param protocol: a SmartServerRequestProtocol.
166
self._serve_one_request_unguarded(protocol)
167
except KeyboardInterrupt:
170
self.terminate_due_to_error()
172
def terminate_due_to_error(self):
173
"""Called when an unhandled exception from the protocol occurs."""
174
raise NotImplementedError(self.terminate_due_to_error)
176
def _get_bytes(self, desired_count):
177
"""Get some bytes from the medium.
179
:param desired_count: number of bytes we want to read.
181
raise NotImplementedError(self._get_bytes)
184
"""Read bytes from this request's response until a newline byte.
186
This isn't particularly efficient, so should only be used when the
187
expected size of the line is quite short.
189
:returns: a string of bytes ending in a newline (byte 0x0A).
193
while newline_pos == -1:
194
new_bytes = self._get_bytes(1)
197
# Ran out of bytes before receiving a complete line.
199
newline_pos = bytes.find('\n')
200
line = bytes[:newline_pos+1]
201
self._push_back(bytes[newline_pos+1:])
205
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
207
def __init__(self, sock, backing_transport, root_client_path='/'):
210
:param sock: the socket the server will read from. It will be put
213
SmartServerStreamMedium.__init__(
214
self, backing_transport, root_client_path=root_client_path)
215
sock.setblocking(True)
218
def _serve_one_request_unguarded(self, protocol):
219
while protocol.next_read_size():
220
bytes = self._get_bytes(4096)
224
protocol.accept_bytes(bytes)
226
self._push_back(protocol.unused_data)
228
def _get_bytes(self, desired_count):
229
if self._push_back_buffer is not None:
230
return self._get_push_back_buffer()
231
# We ignore the desired_count because on sockets it's more efficient to
233
return self.socket.recv(4096)
235
def terminate_due_to_error(self):
236
# TODO: This should log to a server log file, but no such thing
237
# exists yet. Andrew Bennetts 2006-09-29.
241
def _write_out(self, bytes):
242
osutils.send_all(self.socket, bytes)
245
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
247
def __init__(self, in_file, out_file, backing_transport):
248
"""Construct new server.
250
:param in_file: Python file from which requests can be read.
251
:param out_file: Python file to write responses.
252
:param backing_transport: Transport for the directory served.
254
SmartServerStreamMedium.__init__(self, backing_transport)
255
if sys.platform == 'win32':
256
# force binary mode for files
258
for f in (in_file, out_file):
259
fileno = getattr(f, 'fileno', None)
261
msvcrt.setmode(fileno(), os.O_BINARY)
265
def _serve_one_request_unguarded(self, protocol):
267
bytes_to_read = protocol.next_read_size()
268
if bytes_to_read == 0:
269
# Finished serving this request.
272
bytes = self._get_bytes(bytes_to_read)
274
# Connection has been closed.
278
protocol.accept_bytes(bytes)
280
def _get_bytes(self, desired_count):
281
if self._push_back_buffer is not None:
282
return self._get_push_back_buffer()
283
return self._in.read(desired_count)
285
def terminate_due_to_error(self):
286
# TODO: This should log to a server log file, but no such thing
287
# exists yet. Andrew Bennetts 2006-09-29.
291
def _write_out(self, bytes):
292
self._out.write(bytes)
295
class SmartClientMediumRequest(object):
296
"""A request on a SmartClientMedium.
298
Each request allows bytes to be provided to it via accept_bytes, and then
299
the response bytes to be read via read_bytes.
302
request.accept_bytes('123')
303
request.finished_writing()
304
result = request.read_bytes(3)
305
request.finished_reading()
307
It is up to the individual SmartClientMedium whether multiple concurrent
308
requests can exist. See SmartClientMedium.get_request to obtain instances
309
of SmartClientMediumRequest, and the concrete Medium you are using for
310
details on concurrency and pipelining.
313
def __init__(self, medium):
314
"""Construct a SmartClientMediumRequest for the medium medium."""
315
self._medium = medium
316
# we track state by constants - we may want to use the same
317
# pattern as BodyReader if it gets more complex.
318
# valid states are: "writing", "reading", "done"
319
self._state = "writing"
321
def accept_bytes(self, bytes):
322
"""Accept bytes for inclusion in this request.
324
This method may not be be called after finished_writing() has been
325
called. It depends upon the Medium whether or not the bytes will be
326
immediately transmitted. Message based Mediums will tend to buffer the
327
bytes until finished_writing() is called.
329
:param bytes: A bytestring.
331
if self._state != "writing":
332
raise errors.WritingCompleted(self)
333
self._accept_bytes(bytes)
335
def _accept_bytes(self, bytes):
336
"""Helper for accept_bytes.
338
Accept_bytes checks the state of the request to determing if bytes
339
should be accepted. After that it hands off to _accept_bytes to do the
342
raise NotImplementedError(self._accept_bytes)
344
def finished_reading(self):
345
"""Inform the request that all desired data has been read.
347
This will remove the request from the pipeline for its medium (if the
348
medium supports pipelining) and any further calls to methods on the
349
request will raise ReadingCompleted.
351
if self._state == "writing":
352
raise errors.WritingNotComplete(self)
353
if self._state != "reading":
354
raise errors.ReadingCompleted(self)
356
self._finished_reading()
358
def _finished_reading(self):
359
"""Helper for finished_reading.
361
finished_reading checks the state of the request to determine if
362
finished_reading is allowed, and if it is hands off to _finished_reading
363
to perform the action.
365
raise NotImplementedError(self._finished_reading)
367
def finished_writing(self):
368
"""Finish the writing phase of this request.
370
This will flush all pending data for this request along the medium.
371
After calling finished_writing, you may not call accept_bytes anymore.
373
if self._state != "writing":
374
raise errors.WritingCompleted(self)
375
self._state = "reading"
376
self._finished_writing()
378
def _finished_writing(self):
379
"""Helper for finished_writing.
381
finished_writing checks the state of the request to determine if
382
finished_writing is allowed, and if it is hands off to _finished_writing
383
to perform the action.
385
raise NotImplementedError(self._finished_writing)
387
def read_bytes(self, count):
388
"""Read bytes from this requests response.
390
This method will block and wait for count bytes to be read. It may not
391
be invoked until finished_writing() has been called - this is to ensure
392
a message-based approach to requests, for compatibility with message
393
based mediums like HTTP.
395
if self._state == "writing":
396
raise errors.WritingNotComplete(self)
397
if self._state != "reading":
398
raise errors.ReadingCompleted(self)
399
return self._read_bytes(count)
401
def _read_bytes(self, count):
402
"""Helper for read_bytes.
404
read_bytes checks the state of the request to determing if bytes
405
should be read. After that it hands off to _read_bytes to do the
408
raise NotImplementedError(self._read_bytes)
411
"""Read bytes from this request's response until a newline byte.
413
This isn't particularly efficient, so should only be used when the
414
expected size of the line is quite short.
416
:returns: a string of bytes ending in a newline (byte 0x0A).
418
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
420
while not line or line[-1] != '\n':
421
new_char = self.read_bytes(1)
424
# end of file encountered reading from server
425
raise errors.ConnectionReset(
426
"please check connectivity and permissions",
427
"(and try -Dhpss if further diagnosis is required)")
431
class SmartClientMedium(object):
432
"""Smart client is a medium for sending smart protocol requests over."""
434
def __init__(self, base):
435
super(SmartClientMedium, self).__init__()
437
self._protocol_version_error = None
438
self._protocol_version = None
439
self._done_hello = False
441
def protocol_version(self):
442
"""Find out if 'hello' smart request works."""
443
if self._protocol_version_error is not None:
444
raise self._protocol_version_error
445
if not self._done_hello:
447
medium_request = self.get_request()
448
# Send a 'hello' request in protocol version one, for maximum
449
# backwards compatibility.
450
client_protocol = SmartClientRequestProtocolOne(medium_request)
451
client_protocol.query_version()
452
self._done_hello = True
453
except errors.SmartProtocolError, e:
454
# Cache the error, just like we would cache a successful
456
self._protocol_version_error = e
460
def should_probe(self):
461
"""Should RemoteBzrDirFormat.probe_transport send a smart request on
464
Some transports are unambiguously smart-only; there's no need to check
465
if the transport is able to carry smart requests, because that's all
466
it is for. In those cases, this method should return False.
468
But some HTTP transports can sometimes fail to carry smart requests,
469
but still be usuable for accessing remote bzrdirs via plain file
470
accesses. So for those transports, their media should return True here
471
so that RemoteBzrDirFormat can determine if it is appropriate for that
476
def disconnect(self):
477
"""If this medium maintains a persistent connection, close it.
479
The default implementation does nothing.
483
class SmartClientStreamMedium(SmartClientMedium):
484
"""Stream based medium common class.
486
SmartClientStreamMediums operate on a stream. All subclasses use a common
487
SmartClientStreamMediumRequest for their requests, and should implement
488
_accept_bytes and _read_bytes to allow the request objects to send and
492
def __init__(self, base):
493
SmartClientMedium.__init__(self, base)
494
self._current_request = None
495
# Be optimistic: we assume the remote end can accept new remote
496
# requests until we get an error saying otherwise. (1.2 adds some
497
# requests that send bodies, which confuses older servers.)
498
self._remote_is_at_least_1_2 = True
500
def accept_bytes(self, bytes):
501
self._accept_bytes(bytes)
504
"""The SmartClientStreamMedium knows how to close the stream when it is
510
"""Flush the output stream.
512
This method is used by the SmartClientStreamMediumRequest to ensure that
513
all data for a request is sent, to avoid long timeouts or deadlocks.
515
raise NotImplementedError(self._flush)
517
def get_request(self):
518
"""See SmartClientMedium.get_request().
520
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
523
return SmartClientStreamMediumRequest(self)
525
def read_bytes(self, count):
526
return self._read_bytes(count)
529
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
530
"""A client medium using simple pipes.
532
This client does not manage the pipes: it assumes they will always be open.
535
def __init__(self, readable_pipe, writeable_pipe, base):
536
SmartClientStreamMedium.__init__(self, base)
537
self._readable_pipe = readable_pipe
538
self._writeable_pipe = writeable_pipe
540
def _accept_bytes(self, bytes):
541
"""See SmartClientStreamMedium.accept_bytes."""
542
self._writeable_pipe.write(bytes)
545
"""See SmartClientStreamMedium._flush()."""
546
self._writeable_pipe.flush()
548
def _read_bytes(self, count):
549
"""See SmartClientStreamMedium._read_bytes."""
550
return self._readable_pipe.read(count)
553
class SmartSSHClientMedium(SmartClientStreamMedium):
554
"""A client medium using SSH."""
556
def __init__(self, host, port=None, username=None, password=None,
557
base=None, vendor=None, bzr_remote_path=None):
558
"""Creates a client that will connect on the first use.
560
:param vendor: An optional override for the ssh vendor to use. See
561
bzrlib.transport.ssh for details on ssh vendors.
563
SmartClientStreamMedium.__init__(self, base)
564
self._connected = False
566
self._password = password
568
self._username = username
569
self._read_from = None
570
self._ssh_connection = None
571
self._vendor = vendor
572
self._write_to = None
573
self._bzr_remote_path = bzr_remote_path
574
if self._bzr_remote_path is None:
575
symbol_versioning.warn(
576
'bzr_remote_path is required as of bzr 0.92',
577
DeprecationWarning, stacklevel=2)
578
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
580
def _accept_bytes(self, bytes):
581
"""See SmartClientStreamMedium.accept_bytes."""
582
self._ensure_connection()
583
self._write_to.write(bytes)
585
def disconnect(self):
586
"""See SmartClientMedium.disconnect()."""
587
if not self._connected:
589
self._read_from.close()
590
self._write_to.close()
591
self._ssh_connection.close()
592
self._connected = False
594
def _ensure_connection(self):
595
"""Connect this medium if not already connected."""
598
if self._vendor is None:
599
vendor = ssh._get_ssh_vendor()
601
vendor = self._vendor
602
self._ssh_connection = vendor.connect_ssh(self._username,
603
self._password, self._host, self._port,
604
command=[self._bzr_remote_path, 'serve', '--inet',
605
'--directory=/', '--allow-writes'])
606
self._read_from, self._write_to = \
607
self._ssh_connection.get_filelike_channels()
608
self._connected = True
611
"""See SmartClientStreamMedium._flush()."""
612
self._write_to.flush()
614
def _read_bytes(self, count):
615
"""See SmartClientStreamMedium.read_bytes."""
616
if not self._connected:
617
raise errors.MediumNotConnected(self)
618
return self._read_from.read(count)
621
# Port 4155 is the default port for bzr://, registered with IANA.
622
BZR_DEFAULT_INTERFACE = '0.0.0.0'
623
BZR_DEFAULT_PORT = 4155
626
class SmartTCPClientMedium(SmartClientStreamMedium):
627
"""A client medium using TCP."""
629
def __init__(self, host, port, base):
630
"""Creates a client that will connect on the first use."""
631
SmartClientStreamMedium.__init__(self, base)
632
self._connected = False
637
def _accept_bytes(self, bytes):
638
"""See SmartClientMedium.accept_bytes."""
639
self._ensure_connection()
640
osutils.send_all(self._socket, bytes)
642
def disconnect(self):
643
"""See SmartClientMedium.disconnect()."""
644
if not self._connected:
648
self._connected = False
650
def _ensure_connection(self):
651
"""Connect this medium if not already connected."""
654
self._socket = socket.socket()
655
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
656
if self._port is None:
657
port = BZR_DEFAULT_PORT
659
port = int(self._port)
661
self._socket.connect((self._host, port))
662
except socket.error, err:
663
# socket errors either have a (string) or (errno, string) as their
665
if type(err.args) is str:
668
err_msg = err.args[1]
669
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
670
(self._host, port, err_msg))
671
self._connected = True
674
"""See SmartClientStreamMedium._flush().
676
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
677
add a means to do a flush, but that can be done in the future.
680
def _read_bytes(self, count):
681
"""See SmartClientMedium.read_bytes."""
682
if not self._connected:
683
raise errors.MediumNotConnected(self)
684
return self._socket.recv(count)
687
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
688
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
690
def __init__(self, medium):
691
SmartClientMediumRequest.__init__(self, medium)
692
# check that we are safe concurrency wise. If some streams start
693
# allowing concurrent requests - i.e. via multiplexing - then this
694
# assert should be moved to SmartClientStreamMedium.get_request,
695
# and the setting/unsetting of _current_request likewise moved into
696
# that class : but its unneeded overhead for now. RBC 20060922
697
if self._medium._current_request is not None:
698
raise errors.TooManyConcurrentRequests(self._medium)
699
self._medium._current_request = self
701
def _accept_bytes(self, bytes):
702
"""See SmartClientMediumRequest._accept_bytes.
704
This forwards to self._medium._accept_bytes because we are operating
705
on the mediums stream.
707
self._medium._accept_bytes(bytes)
709
def _finished_reading(self):
710
"""See SmartClientMediumRequest._finished_reading.
712
This clears the _current_request on self._medium to allow a new
713
request to be created.
715
if self._medium._current_request is not self:
716
raise AssertionError()
717
self._medium._current_request = None
719
def _finished_writing(self):
720
"""See SmartClientMediumRequest._finished_writing.
722
This invokes self._medium._flush to ensure all bytes are transmitted.
724
self._medium._flush()
726
def _read_bytes(self, count):
727
"""See SmartClientMediumRequest._read_bytes.
729
This forwards to self._medium._read_bytes because we are operating
730
on the mediums stream.
732
return self._medium._read_bytes(count)