1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""RemoteTransport client for the smart-server.
19
This module shouldn't be accessed directly. The classes defined here should be
20
imported from bzrlib.smart.
23
__all__ = ['RemoteTransport', 'SmartTCPTransport', 'SmartSSHTransport']
25
from cStringIO import StringIO
33
from bzrlib.smart import client, medium, protocol
35
# must do this otherwise urllib can't parse the urls properly :(
36
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
37
transport.register_urlparse_netloc_protocol(scheme)
41
# Port 4155 is the default port for bzr://, registered with IANA.
42
BZR_DEFAULT_PORT = 4155
45
def _recv_tuple(from_file):
46
req_line = from_file.readline()
47
return _decode_tuple(req_line)
50
def _decode_tuple(req_line):
51
if req_line == None or req_line == '':
53
if req_line[-1] != '\n':
54
raise errors.SmartProtocolError("request %r not terminated" % req_line)
55
return tuple(req_line[:-1].split('\x01'))
58
def _encode_tuple(args):
59
"""Encode the tuple args to a bytestream."""
60
return '\x01'.join(args) + '\n'
63
class SmartProtocolBase(object):
64
"""Methods common to client and server"""
66
# TODO: this only actually accomodates a single block; possibly should
67
# support multiple chunks?
68
def _encode_bulk_data(self, body):
69
"""Encode body as a bulk data chunk."""
70
return ''.join(('%d\n' % len(body), body, 'done\n'))
72
def _serialise_offsets(self, offsets):
73
"""Serialise a readv offset list."""
75
for start, length in offsets:
76
txt.append('%d,%d' % (start, length))
80
class SmartServerRequestProtocolOne(SmartProtocolBase):
81
"""Server-side encoding and decoding logic for smart version 1."""
83
def __init__(self, backing_transport, write_func):
84
self._backing_transport = backing_transport
85
self.excess_buffer = ''
86
self._finished = False
88
self.has_dispatched = False
90
self._body_decoder = None
91
self._write_func = write_func
93
def accept_bytes(self, bytes):
94
"""Take bytes, and advance the internal state machine appropriately.
96
:param bytes: must be a byte string
98
assert isinstance(bytes, str)
99
self.in_buffer += bytes
100
if not self.has_dispatched:
101
if '\n' not in self.in_buffer:
102
# no command line yet
104
self.has_dispatched = True
106
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
108
req_args = _decode_tuple(first_line)
109
self.request = SmartServerRequestHandler(
110
self._backing_transport)
111
self.request.dispatch_command(req_args[0], req_args[1:])
112
if self.request.finished_reading:
114
self.excess_buffer = self.in_buffer
116
self._send_response(self.request.response.args,
117
self.request.response.body)
118
except KeyboardInterrupt:
120
except Exception, exception:
121
# everything else: pass to client, flush, and quit
122
self._send_response(('error', str(exception)))
125
if self.has_dispatched:
127
# nothing to do.XXX: this routine should be a single state
129
self.excess_buffer += self.in_buffer
132
if self._body_decoder is None:
133
self._body_decoder = LengthPrefixedBodyDecoder()
134
self._body_decoder.accept_bytes(self.in_buffer)
135
self.in_buffer = self._body_decoder.unused_data
136
body_data = self._body_decoder.read_pending_data()
137
self.request.accept_body(body_data)
138
if self._body_decoder.finished_reading:
139
self.request.end_of_body()
140
assert self.request.finished_reading, \
141
"no more body, request not finished"
142
if self.request.response is not None:
143
self._send_response(self.request.response.args,
144
self.request.response.body)
145
self.excess_buffer = self.in_buffer
148
assert not self.request.finished_reading, \
149
"no response and we have finished reading."
151
def _send_response(self, args, body=None):
152
"""Send a smart server response down the output stream."""
153
assert not self._finished, 'response already sent'
154
self._finished = True
155
self._write_func(_encode_tuple(args))
157
assert isinstance(body, str), 'body must be a str'
158
bytes = self._encode_bulk_data(body)
159
self._write_func(bytes)
161
def next_read_size(self):
164
if self._body_decoder is None:
167
return self._body_decoder.next_read_size()
170
class LengthPrefixedBodyDecoder(object):
171
"""Decodes the length-prefixed bulk data."""
174
self.bytes_left = None
175
self.finished_reading = False
176
self.unused_data = ''
177
self.state_accept = self._state_accept_expecting_length
178
self.state_read = self._state_read_no_data
180
self._trailer_buffer = ''
182
def accept_bytes(self, bytes):
183
"""Decode as much of bytes as possible.
185
If 'bytes' contains too much data it will be appended to
188
finished_reading will be set when no more data is required. Further
189
data will be appended to self.unused_data.
191
# accept_bytes is allowed to change the state
192
current_state = self.state_accept
193
self.state_accept(bytes)
194
while current_state != self.state_accept:
195
current_state = self.state_accept
196
self.state_accept('')
198
def next_read_size(self):
199
if self.bytes_left is not None:
200
# Ideally we want to read all the remainder of the body and the
202
return self.bytes_left + 5
203
elif self.state_accept == self._state_accept_reading_trailer:
204
# Just the trailer left
205
return 5 - len(self._trailer_buffer)
206
elif self.state_accept == self._state_accept_expecting_length:
207
# There's still at least 6 bytes left ('\n' to end the length, plus
211
# Reading excess data. Either way, 1 byte at a time is fine.
214
def read_pending_data(self):
215
"""Return any pending data that has been decoded."""
216
return self.state_read()
218
def _state_accept_expecting_length(self, bytes):
219
self._in_buffer += bytes
220
pos = self._in_buffer.find('\n')
223
self.bytes_left = int(self._in_buffer[:pos])
224
self._in_buffer = self._in_buffer[pos+1:]
225
self.bytes_left -= len(self._in_buffer)
226
self.state_accept = self._state_accept_reading_body
227
self.state_read = self._state_read_in_buffer
229
def _state_accept_reading_body(self, bytes):
230
self._in_buffer += bytes
231
self.bytes_left -= len(bytes)
232
if self.bytes_left <= 0:
234
if self.bytes_left != 0:
235
self._trailer_buffer = self._in_buffer[self.bytes_left:]
236
self._in_buffer = self._in_buffer[:self.bytes_left]
237
self.bytes_left = None
238
self.state_accept = self._state_accept_reading_trailer
240
def _state_accept_reading_trailer(self, bytes):
241
self._trailer_buffer += bytes
242
# TODO: what if the trailer does not match "done\n"? Should this raise
243
# a ProtocolViolation exception?
244
if self._trailer_buffer.startswith('done\n'):
245
self.unused_data = self._trailer_buffer[len('done\n'):]
246
self.state_accept = self._state_accept_reading_unused
247
self.finished_reading = True
249
def _state_accept_reading_unused(self, bytes):
250
self.unused_data += bytes
252
def _state_read_no_data(self):
255
def _state_read_in_buffer(self):
256
result = self._in_buffer
261
class SmartServerStreamMedium(object):
262
"""Handles smart commands coming over a stream.
264
The stream may be a pipe connected to sshd, or a tcp socket, or an
265
in-process fifo for testing.
267
One instance is created for each connected client; it can serve multiple
268
requests in the lifetime of the connection.
270
The server passes requests through to an underlying backing transport,
271
which will typically be a LocalTransport looking at the server's filesystem.
274
def __init__(self, backing_transport):
275
"""Construct new server.
277
:param backing_transport: Transport for the directory served.
279
# backing_transport could be passed to serve instead of __init__
280
self.backing_transport = backing_transport
281
self.finished = False
284
"""Serve requests until the client disconnects."""
285
# Keep a reference to stderr because the sys module's globals get set to
286
# None during interpreter shutdown.
287
from sys import stderr
289
while not self.finished:
290
protocol = SmartServerRequestProtocolOne(self.backing_transport,
292
self._serve_one_request(protocol)
294
stderr.write("%s terminating on exception %s\n" % (self, e))
297
def _serve_one_request(self, protocol):
298
"""Read one request from input, process, send back a response.
300
:param protocol: a SmartServerRequestProtocol.
303
self._serve_one_request_unguarded(protocol)
304
except KeyboardInterrupt:
307
self.terminate_due_to_error()
309
def terminate_due_to_error(self):
310
"""Called when an unhandled exception from the protocol occurs."""
311
raise NotImplementedError(self.terminate_due_to_error)
314
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
316
def __init__(self, sock, backing_transport):
319
:param sock: the socket the server will read from. It will be put
322
SmartServerStreamMedium.__init__(self, backing_transport)
324
sock.setblocking(True)
327
def _serve_one_request_unguarded(self, protocol):
328
while protocol.next_read_size():
330
protocol.accept_bytes(self.push_back)
333
bytes = self.socket.recv(4096)
337
protocol.accept_bytes(bytes)
339
self.push_back = protocol.excess_buffer
341
def terminate_due_to_error(self):
342
"""Called when an unhandled exception from the protocol occurs."""
343
# TODO: This should log to a server log file, but no such thing
344
# exists yet. Andrew Bennetts 2006-09-29.
348
def _write_out(self, bytes):
349
self.socket.sendall(bytes)
352
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
354
def __init__(self, in_file, out_file, backing_transport):
355
"""Construct new server.
357
:param in_file: Python file from which requests can be read.
358
:param out_file: Python file to write responses.
359
:param backing_transport: Transport for the directory served.
361
SmartServerStreamMedium.__init__(self, backing_transport)
365
def _serve_one_request_unguarded(self, protocol):
367
bytes_to_read = protocol.next_read_size()
368
if bytes_to_read == 0:
369
# Finished serving this request.
372
bytes = self._in.read(bytes_to_read)
374
# Connection has been closed.
378
protocol.accept_bytes(bytes)
380
def terminate_due_to_error(self):
381
# TODO: This should log to a server log file, but no such thing
382
# exists yet. Andrew Bennetts 2006-09-29.
386
def _write_out(self, bytes):
387
self._out.write(bytes)
390
class SmartServerResponse(object):
391
"""Response generated by SmartServerRequestHandler."""
393
def __init__(self, args, body=None):
397
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
398
# for delivering the data for a request. This could be done with as the
399
# StreamServer, though that would create conflation between request and response
400
# which may be undesirable.
403
class SmartServerRequestHandler(object):
404
"""Protocol logic for smart server.
406
This doesn't handle serialization at all, it just processes requests and
410
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
411
# not contain encoding or decoding logic to allow the wire protocol to vary
412
# from the object protocol: we will want to tweak the wire protocol separate
413
# from the object model, and ideally we will be able to do that without
414
# having a SmartServerRequestHandler subclass for each wire protocol, rather
415
# just a Protocol subclass.
417
# TODO: Better way of representing the body for commands that take it,
418
# and allow it to be streamed into the server.
420
def __init__(self, backing_transport):
421
self._backing_transport = backing_transport
422
self._converted_command = False
423
self.finished_reading = False
424
self._body_bytes = ''
427
def accept_body(self, bytes):
430
This should be overriden for each command that desired body data to
431
handle the right format of that data. I.e. plain bytes, a bundle etc.
433
The deserialisation into that format should be done in the Protocol
434
object. Set self.desired_body_format to the format your method will
437
# default fallback is to accumulate bytes.
438
self._body_bytes += bytes
440
def _end_of_body_handler(self):
441
"""An unimplemented end of body handler."""
442
raise NotImplementedError(self._end_of_body_handler)
445
"""Answer a version request with my version."""
446
return SmartServerResponse(('ok', '1'))
448
def do_has(self, relpath):
449
r = self._backing_transport.has(relpath) and 'yes' or 'no'
450
return SmartServerResponse((r,))
452
def do_get(self, relpath):
453
backing_bytes = self._backing_transport.get_bytes(relpath)
454
return SmartServerResponse(('ok',), backing_bytes)
456
def _deserialise_optional_mode(self, mode):
457
# XXX: FIXME this should be on the protocol object.
463
def do_append(self, relpath, mode):
464
self._converted_command = True
465
self._relpath = relpath
466
self._mode = self._deserialise_optional_mode(mode)
467
self._end_of_body_handler = self._handle_do_append_end
469
def _handle_do_append_end(self):
470
old_length = self._backing_transport.append_bytes(
471
self._relpath, self._body_bytes, self._mode)
472
self.response = SmartServerResponse(('appended', '%d' % old_length))
474
def do_delete(self, relpath):
475
self._backing_transport.delete(relpath)
477
def do_iter_files_recursive(self, relpath):
478
transport = self._backing_transport.clone(relpath)
479
filenames = transport.iter_files_recursive()
480
return SmartServerResponse(('names',) + tuple(filenames))
482
def do_list_dir(self, relpath):
483
filenames = self._backing_transport.list_dir(relpath)
484
return SmartServerResponse(('names',) + tuple(filenames))
486
def do_mkdir(self, relpath, mode):
487
self._backing_transport.mkdir(relpath,
488
self._deserialise_optional_mode(mode))
490
def do_move(self, rel_from, rel_to):
491
self._backing_transport.move(rel_from, rel_to)
493
def do_put(self, relpath, mode):
494
self._converted_command = True
495
self._relpath = relpath
496
self._mode = self._deserialise_optional_mode(mode)
497
self._end_of_body_handler = self._handle_do_put
499
def _handle_do_put(self):
500
self._backing_transport.put_bytes(self._relpath,
501
self._body_bytes, self._mode)
502
self.response = SmartServerResponse(('ok',))
504
def _deserialise_offsets(self, text):
505
# XXX: FIXME this should be on the protocol object.
507
for line in text.split('\n'):
510
start, length = line.split(',')
511
offsets.append((int(start), int(length)))
514
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
515
self._converted_command = True
516
self._end_of_body_handler = self._handle_put_non_atomic
517
self._relpath = relpath
518
self._dir_mode = self._deserialise_optional_mode(dir_mode)
519
self._mode = self._deserialise_optional_mode(mode)
520
# a boolean would be nicer XXX
521
self._create_parent = (create_parent == 'T')
523
def _handle_put_non_atomic(self):
524
self._backing_transport.put_bytes_non_atomic(self._relpath,
527
create_parent_dir=self._create_parent,
528
dir_mode=self._dir_mode)
529
self.response = SmartServerResponse(('ok',))
531
def do_readv(self, relpath):
532
self._converted_command = True
533
self._end_of_body_handler = self._handle_readv_offsets
534
self._relpath = relpath
536
def end_of_body(self):
537
"""No more body data will be received."""
538
self._run_handler_code(self._end_of_body_handler, (), {})
539
# cannot read after this.
540
self.finished_reading = True
542
def _handle_readv_offsets(self):
543
"""accept offsets for a readv request."""
544
offsets = self._deserialise_offsets(self._body_bytes)
545
backing_bytes = ''.join(bytes for offset, bytes in
546
self._backing_transport.readv(self._relpath, offsets))
547
self.response = SmartServerResponse(('readv',), backing_bytes)
549
def do_rename(self, rel_from, rel_to):
550
self._backing_transport.rename(rel_from, rel_to)
552
def do_rmdir(self, relpath):
553
self._backing_transport.rmdir(relpath)
555
def do_stat(self, relpath):
556
stat = self._backing_transport.stat(relpath)
557
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
559
def do_get_bundle(self, path, revision_id):
560
# open transport relative to our base
561
t = self._backing_transport.clone(path)
562
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
563
repo = control.open_repository()
564
tmpf = tempfile.TemporaryFile()
565
base_revision = revision.NULL_REVISION
566
write_bundle(repo, revision_id, base_revision, tmpf)
568
return SmartServerResponse((), tmpf.read())
570
def dispatch_command(self, cmd, args):
571
"""Deprecated compatibility method.""" # XXX XXX
572
func = getattr(self, 'do_' + cmd, None)
574
raise errors.SmartProtocolError("bad request %r" % (cmd,))
575
self._run_handler_code(func, args, {})
577
def _run_handler_code(self, callable, args, kwargs):
578
"""Run some handler specific code 'callable'.
580
If a result is returned, it is considered to be the commands response,
581
and finished_reading is set true, and its assigned to self.response.
583
Any exceptions caught are translated and a response object created
586
result = self._call_converting_errors(callable, args, kwargs)
587
if result is not None:
588
self.response = result
589
self.finished_reading = True
590
# handle unconverted commands
591
if not self._converted_command:
592
self.finished_reading = True
594
self.response = SmartServerResponse(('ok',))
596
def _call_converting_errors(self, callable, args, kwargs):
597
"""Call callable converting errors to Response objects."""
599
return callable(*args, **kwargs)
600
except errors.NoSuchFile, e:
601
return SmartServerResponse(('NoSuchFile', e.path))
602
except errors.FileExists, e:
603
return SmartServerResponse(('FileExists', e.path))
604
except errors.DirectoryNotEmpty, e:
605
return SmartServerResponse(('DirectoryNotEmpty', e.path))
606
except errors.ShortReadvError, e:
607
return SmartServerResponse(('ShortReadvError',
608
e.path, str(e.offset), str(e.length), str(e.actual)))
609
except UnicodeError, e:
610
# If it is a DecodeError, than most likely we are starting
611
# with a plain string
612
str_or_unicode = e.object
613
if isinstance(str_or_unicode, unicode):
614
# XXX: UTF-8 might have \x01 (our seperator byte) in it. We
615
# should escape it somehow.
616
val = 'u:' + str_or_unicode.encode('utf-8')
618
val = 's:' + str_or_unicode.encode('base64')
619
# This handles UnicodeEncodeError or UnicodeDecodeError
620
return SmartServerResponse((e.__class__.__name__,
621
e.encoding, val, str(e.start), str(e.end), e.reason))
622
except errors.TransportNotPossible, e:
623
if e.msg == "readonly transport":
624
return SmartServerResponse(('ReadOnlyError', ))
629
class SmartTCPServer(object):
630
"""Listens on a TCP socket and accepts connections from smart clients"""
632
def __init__(self, backing_transport, host='127.0.0.1', port=0):
633
"""Construct a new server.
635
To actually start it running, call either start_background_thread or
638
:param host: Name of the interface to listen on.
639
:param port: TCP port to listen on, or 0 to allocate a transient port.
641
self._server_socket = socket.socket()
642
self._server_socket.bind((host, port))
643
self.port = self._server_socket.getsockname()[1]
644
self._server_socket.listen(1)
645
self._server_socket.settimeout(1)
646
self.backing_transport = backing_transport
649
# let connections timeout so that we get a chance to terminate
650
# Keep a reference to the exceptions we want to catch because the socket
651
# module's globals get set to None during interpreter shutdown.
652
from socket import timeout as socket_timeout
653
from socket import error as socket_error
654
self._should_terminate = False
655
while not self._should_terminate:
657
self.accept_and_serve()
658
except socket_timeout:
659
# just check if we're asked to stop
661
except socket_error, e:
662
trace.warning("client disconnected: %s", e)
666
"""Return the url of the server"""
667
return "bzr://%s:%d/" % self._server_socket.getsockname()
669
def accept_and_serve(self):
670
conn, client_addr = self._server_socket.accept()
671
# For WIN32, where the timeout value from the listening socket
672
# propogates to the newly accepted socket.
673
conn.setblocking(True)
674
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
675
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
676
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
677
connection_thread.setDaemon(True)
678
connection_thread.start()
680
def start_background_thread(self):
681
self._server_thread = threading.Thread(None,
683
name='server-' + self.get_url())
684
self._server_thread.setDaemon(True)
685
self._server_thread.start()
687
def stop_background_thread(self):
688
self._should_terminate = True
689
# At one point we would wait to join the threads here, but it looks
690
# like they don't actually exit. So now we just leave them running
691
# and expect to terminate the process. -- mbp 20070215
692
# self._server_socket.close()
693
## sys.stderr.write("waiting for server thread to finish...")
694
## self._server_thread.join()
697
class SmartTCPServer_for_testing(SmartTCPServer):
698
"""Server suitable for use by transport tests.
700
This server is backed by the process's cwd.
704
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
705
# The server is set up by default like for ssh access: the client
706
# passes filesystem-absolute paths; therefore the server must look
707
# them up relative to the root directory. it might be better to act
708
# a public server and have the server rewrite paths into the test
710
SmartTCPServer.__init__(self,
711
transport.get_transport(urlutils.local_path_to_url('/')))
714
"""Set up server for testing"""
715
self.start_background_thread()
718
self.stop_background_thread()
721
"""Return the url of the server"""
722
host, port = self._server_socket.getsockname()
723
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
725
def get_bogus_url(self):
726
"""Return a URL which will fail to connect"""
727
return 'bzr://127.0.0.1:1/'
730
class _SmartStat(object):
732
def __init__(self, size, mode):
737
class RemoteTransport(transport.Transport):
738
"""Connection to a smart server.
740
The connection holds references to the medium that can be used to send
741
requests to the server.
743
The connection has a notion of the current directory to which it's
744
connected; this is incorporated in filenames passed to the server.
746
This supports some higher-level RPC operations and can also be treated
747
like a Transport to do file-like operations.
749
The connection can be made over a tcp socket, an ssh pipe or a series of
750
http requests. There are concrete subclasses for each type:
751
SmartTCPTransport, etc.
754
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
755
# responsibilities: Put those on SmartClient or similar. This is vital for
756
# the ability to support multiple versions of the smart protocol over time:
757
# RemoteTransport is an adapter from the Transport object model to the
758
# SmartClient model, not an encoder.
760
def __init__(self, url, clone_from=None, medium=None):
763
:param medium: The medium to use for this RemoteTransport. This must be
764
supplied if clone_from is None.
766
### Technically super() here is faulty because Transport's __init__
767
### fails to take 2 parameters, and if super were to choose a silly
768
### initialisation order things would blow up.
769
if not url.endswith('/'):
771
super(RemoteTransport, self).__init__(url)
772
self._scheme, self._username, self._password, self._host, self._port, self._path = \
773
transport.split_url(url)
774
if clone_from is None:
775
self._medium = medium
777
# credentials may be stripped from the base in some circumstances
778
# as yet to be clearly defined or documented, so copy them.
779
self._username = clone_from._username
780
# reuse same connection
781
self._medium = clone_from._medium
782
assert self._medium is not None
784
def abspath(self, relpath):
785
"""Return the full url to the given relative path.
787
@param relpath: the relative path or path components
788
@type relpath: str or list
790
return self._unparse_url(self._remote_path(relpath))
792
def clone(self, relative_url):
793
"""Make a new RemoteTransport related to me, sharing the same connection.
795
This essentially opens a handle on a different remote directory.
797
if relative_url is None:
798
return RemoteTransport(self.base, self)
800
return RemoteTransport(self.abspath(relative_url), self)
802
def is_readonly(self):
803
"""Smart server transport can do read/write file operations."""
806
def get_smart_client(self):
809
def get_smart_medium(self):
812
def _unparse_url(self, path):
813
"""Return URL for a path.
815
:see: SFTPUrlHandling._unparse_url
817
# TODO: Eventually it should be possible to unify this with
818
# SFTPUrlHandling._unparse_url?
821
path = urllib.quote(path)
822
netloc = urllib.quote(self._host)
823
if self._username is not None:
824
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
825
if self._port is not None:
826
netloc = '%s:%d' % (netloc, self._port)
827
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
829
def _remote_path(self, relpath):
830
"""Returns the Unicode version of the absolute path for relpath."""
831
return self._combine_paths(self._path, relpath)
833
def _call(self, method, *args):
834
resp = self._call2(method, *args)
835
self._translate_error(resp)
837
def _call2(self, method, *args):
838
"""Call a method on the remote server."""
839
return client.SmartClient(self._medium).call(method, *args)
841
def _call_with_body_bytes(self, method, args, body):
842
"""Call a method on the remote server with body bytes."""
843
smart_client = client.SmartClient(self._medium)
844
return smart_client.call_with_body_bytes(method, args, body)
846
def has(self, relpath):
847
"""Indicate whether a remote file of the given name exists or not.
849
:see: Transport.has()
851
resp = self._call2('has', self._remote_path(relpath))
852
if resp == ('yes', ):
854
elif resp == ('no', ):
857
self._translate_error(resp)
859
def get(self, relpath):
860
"""Return file-like object reading the contents of a remote file.
862
:see: Transport.get_bytes()/get_file()
864
return StringIO(self.get_bytes(relpath))
866
def get_bytes(self, relpath):
867
remote = self._remote_path(relpath)
868
request = self._medium.get_request()
869
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
870
smart_protocol.call('get', remote)
871
resp = smart_protocol.read_response_tuple(True)
873
smart_protocol.cancel_read_body()
874
self._translate_error(resp, relpath)
875
return smart_protocol.read_body_bytes()
877
def _serialise_optional_mode(self, mode):
883
def mkdir(self, relpath, mode=None):
884
resp = self._call2('mkdir', self._remote_path(relpath),
885
self._serialise_optional_mode(mode))
886
self._translate_error(resp)
888
def put_bytes(self, relpath, upload_contents, mode=None):
889
# FIXME: upload_file is probably not safe for non-ascii characters -
890
# should probably just pass all parameters as length-delimited
892
resp = self._call_with_body_bytes('put',
893
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
895
self._translate_error(resp)
897
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
898
create_parent_dir=False,
900
"""See Transport.put_bytes_non_atomic."""
901
# FIXME: no encoding in the transport!
902
create_parent_str = 'F'
903
if create_parent_dir:
904
create_parent_str = 'T'
906
resp = self._call_with_body_bytes(
908
(self._remote_path(relpath), self._serialise_optional_mode(mode),
909
create_parent_str, self._serialise_optional_mode(dir_mode)),
911
self._translate_error(resp)
913
def put_file(self, relpath, upload_file, mode=None):
914
# its not ideal to seek back, but currently put_non_atomic_file depends
915
# on transports not reading before failing - which is a faulty
916
# assumption I think - RBC 20060915
917
pos = upload_file.tell()
919
return self.put_bytes(relpath, upload_file.read(), mode)
921
upload_file.seek(pos)
924
def put_file_non_atomic(self, relpath, f, mode=None,
925
create_parent_dir=False,
927
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
928
create_parent_dir=create_parent_dir,
931
def append_file(self, relpath, from_file, mode=None):
932
return self.append_bytes(relpath, from_file.read(), mode)
934
def append_bytes(self, relpath, bytes, mode=None):
935
resp = self._call_with_body_bytes(
937
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
939
if resp[0] == 'appended':
941
self._translate_error(resp)
943
def delete(self, relpath):
944
resp = self._call2('delete', self._remote_path(relpath))
945
self._translate_error(resp)
947
def readv(self, relpath, offsets):
951
offsets = list(offsets)
953
sorted_offsets = sorted(offsets)
954
# turn the list of offsets into a stack
955
offset_stack = iter(offsets)
956
cur_offset_and_size = offset_stack.next()
957
coalesced = list(self._coalesce_offsets(sorted_offsets,
958
limit=self._max_readv_combine,
959
fudge_factor=self._bytes_to_read_before_seek))
961
request = self._medium.get_request()
962
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
963
smart_protocol.call_with_body_readv_array(
964
('readv', self._remote_path(relpath)),
965
[(c.start, c.length) for c in coalesced])
966
resp = smart_protocol.read_response_tuple(True)
968
if resp[0] != 'readv':
969
# This should raise an exception
970
smart_protocol.cancel_read_body()
971
self._translate_error(resp)
974
# FIXME: this should know how many bytes are needed, for clarity.
975
data = smart_protocol.read_body_bytes()
976
# Cache the results, but only until they have been fulfilled
978
for c_offset in coalesced:
979
if len(data) < c_offset.length:
980
raise errors.ShortReadvError(relpath, c_offset.start,
981
c_offset.length, actual=len(data))
982
for suboffset, subsize in c_offset.ranges:
983
key = (c_offset.start+suboffset, subsize)
984
data_map[key] = data[suboffset:suboffset+subsize]
985
data = data[c_offset.length:]
987
# Now that we've read some data, see if we can yield anything back
988
while cur_offset_and_size in data_map:
989
this_data = data_map.pop(cur_offset_and_size)
990
yield cur_offset_and_size[0], this_data
991
cur_offset_and_size = offset_stack.next()
993
def rename(self, rel_from, rel_to):
995
self._remote_path(rel_from),
996
self._remote_path(rel_to))
998
def move(self, rel_from, rel_to):
1000
self._remote_path(rel_from),
1001
self._remote_path(rel_to))
1003
def rmdir(self, relpath):
1004
resp = self._call('rmdir', self._remote_path(relpath))
1006
def _translate_error(self, resp, orig_path=None):
1007
"""Raise an exception from a response"""
1014
elif what == 'NoSuchFile':
1015
if orig_path is not None:
1016
error_path = orig_path
1018
error_path = resp[1]
1019
raise errors.NoSuchFile(error_path)
1020
elif what == 'error':
1021
raise errors.SmartProtocolError(unicode(resp[1]))
1022
elif what == 'FileExists':
1023
raise errors.FileExists(resp[1])
1024
elif what == 'DirectoryNotEmpty':
1025
raise errors.DirectoryNotEmpty(resp[1])
1026
elif what == 'ShortReadvError':
1027
raise errors.ShortReadvError(resp[1], int(resp[2]),
1028
int(resp[3]), int(resp[4]))
1029
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1030
encoding = str(resp[1]) # encoding must always be a string
1032
start = int(resp[3])
1034
reason = str(resp[5]) # reason must always be a string
1035
if val.startswith('u:'):
1036
val = val[2:].decode('utf-8')
1037
elif val.startswith('s:'):
1038
val = val[2:].decode('base64')
1039
if what == 'UnicodeDecodeError':
1040
raise UnicodeDecodeError(encoding, val, start, end, reason)
1041
elif what == 'UnicodeEncodeError':
1042
raise UnicodeEncodeError(encoding, val, start, end, reason)
1043
elif what == "ReadOnlyError":
1044
raise errors.TransportNotPossible('readonly transport')
1046
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1048
def disconnect(self):
1049
self._medium.disconnect()
1051
def delete_tree(self, relpath):
1052
raise errors.TransportNotPossible('readonly transport')
1054
def stat(self, relpath):
1055
resp = self._call2('stat', self._remote_path(relpath))
1056
if resp[0] == 'stat':
1057
return _SmartStat(int(resp[1]), int(resp[2], 8))
1059
self._translate_error(resp)
1061
## def lock_read(self, relpath):
1062
## """Lock the given file for shared (read) access.
1063
## :return: A lock object, which should be passed to Transport.unlock()
1065
## # The old RemoteBranch ignore lock for reading, so we will
1066
## # continue that tradition and return a bogus lock object.
1067
## class BogusLock(object):
1068
## def __init__(self, path):
1070
## def unlock(self):
1072
## return BogusLock(relpath)
1077
def list_dir(self, relpath):
1078
resp = self._call2('list_dir', self._remote_path(relpath))
1079
if resp[0] == 'names':
1080
return [name.encode('ascii') for name in resp[1:]]
1082
self._translate_error(resp)
1084
def iter_files_recursive(self):
1085
resp = self._call2('iter_files_recursive', self._remote_path(''))
1086
if resp[0] == 'names':
1089
self._translate_error(resp)
1092
class SmartTCPTransport(RemoteTransport):
1093
"""Connection to smart server over plain tcp.
1095
This is essentially just a factory to get 'RemoteTransport(url,
1096
SmartTCPClientMedium).
1099
def __init__(self, url):
1100
_scheme, _username, _password, _host, _port, _path = \
1101
transport.split_url(url)
1103
_port = BZR_DEFAULT_PORT
1107
except (ValueError, TypeError), e:
1108
raise errors.InvalidURL(
1109
path=url, extra="invalid port %s" % _port)
1110
client_medium = medium.SmartTCPClientMedium(_host, _port)
1111
super(SmartTCPTransport, self).__init__(url, medium=client_medium)
1114
class SmartSSHTransport(RemoteTransport):
1115
"""Connection to smart server over SSH.
1117
This is essentially just a factory to get 'RemoteTransport(url,
1118
SmartSSHClientMedium).
1121
def __init__(self, url):
1122
_scheme, _username, _password, _host, _port, _path = \
1123
transport.split_url(url)
1125
if _port is not None:
1127
except (ValueError, TypeError), e:
1128
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1130
client_medium = medium.SmartSSHClientMedium(_host, _port,
1131
_username, _password)
1132
super(SmartSSHTransport, self).__init__(url, medium=client_medium)
1135
class SmartHTTPTransport(RemoteTransport):
1136
"""Just a way to connect between a bzr+http:// url and http://.
1138
This connection operates slightly differently than the SmartSSHTransport.
1139
It uses a plain http:// transport underneath, which defines what remote
1140
.bzr/smart URL we are connected to. From there, all paths that are sent are
1141
sent as relative paths, this way, the remote side can properly
1142
de-reference them, since it is likely doing rewrite rules to translate an
1143
HTTP path into a local path.
1146
def __init__(self, url, http_transport=None):
1147
assert url.startswith('bzr+http://')
1149
if http_transport is None:
1150
http_url = url[len('bzr+'):]
1151
self._http_transport = transport.get_transport(http_url)
1153
self._http_transport = http_transport
1154
http_medium = self._http_transport.get_smart_medium()
1155
super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
1157
def _remote_path(self, relpath):
1158
"""After connecting HTTP Transport only deals in relative URLs."""
1164
def abspath(self, relpath):
1165
"""Return the full url to the given relative path.
1167
:param relpath: the relative path or path components
1168
:type relpath: str or list
1170
return self._unparse_url(self._combine_paths(self._path, relpath))
1172
def clone(self, relative_url):
1173
"""Make a new SmartHTTPTransport related to me.
1175
This is re-implemented rather than using the default
1176
SmartTransport.clone() because we must be careful about the underlying
1180
abs_url = self.abspath(relative_url)
1183
# By cloning the underlying http_transport, we are able to share the
1185
new_transport = self._http_transport.clone(relative_url)
1186
return SmartHTTPTransport(abs_url, http_transport=new_transport)
1189
def get_test_permutations():
1190
"""Return (transport, server) permutations for testing."""
1191
### We may need a little more test framework support to construct an
1192
### appropriate RemoteTransport in the future.
1193
from bzrlib.smart import server
1194
return [(SmartTCPTransport, server.SmartTCPServer_for_testing)]