/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: Robert Collins
  • Date: 2006-09-18 05:35:44 UTC
  • mto: (1986.4.4 test_ancestry.py)
  • mto: This revision was merged to the branch mainline in revision 2059.
  • Revision ID: robertc@robertcollins.net-20060918053544-531ddf7fd2ae877b
New test base class TestCaseWithMemoryTransport offers memory-only
testing facilities: its not suitable for tests that need to mutate disk
state, but most tests should not need that and should be converted to
TestCaseWithMemoryTransport. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""RemoteTransport client for the smart-server.
18
 
 
19
 
This module shouldn't be accessed directly.  The classes defined here should be
20
 
imported from bzrlib.smart.
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK+ TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
21
49
"""
22
50
 
23
 
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
 
51
 
 
52
 
 
53
# TODO: A plain integer from query_version is too simple; should give some
 
54
# capabilities too?
 
55
 
 
56
# TODO: Server should probably catch exceptions within itself and send them
 
57
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
58
# Also needs to somehow report protocol errors like bad requests.  Need to
 
59
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
60
# bulk transfer and then something goes wrong.
 
61
 
 
62
# TODO: Standard marker at start of request/response lines?
 
63
 
 
64
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
65
#
 
66
# TODO: get/put objects could be changed to gradually read back the data as it
 
67
# comes across the network
 
68
#
 
69
# TODO: What should the server do if it hits an error and has to terminate?
 
70
#
 
71
# TODO: is it useful to allow multiple chunks in the bulk data?
 
72
#
 
73
# TODO: If we get an exception during transmission of bulk data we can't just
 
74
# emit the exception because it won't be seen.
 
75
#   John proposes:  I think it would be worthwhile to have a header on each
 
76
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
77
#   chunk as long as you finish the previous chunk.
 
78
#
 
79
# TODO: Clone method on Transport; should work up towards parent directory;
 
80
# unclear how this should be stored or communicated to the server... maybe
 
81
# just pass it on all relevant requests?
 
82
#
 
83
# TODO: Better name than clone() for changing between directories.  How about
 
84
# open_dir or change_dir or chdir?
 
85
#
 
86
# TODO: Is it really good to have the notion of current directory within the
 
87
# connection?  Perhaps all Transports should factor out a common connection
 
88
# from the thing that has the directory context?
 
89
#
 
90
# TODO: Pull more things common to sftp and ssh to a higher level.
 
91
#
 
92
# TODO: The server that manages a connection should be quite small and retain
 
93
# minimum state because each of the requests are supposed to be stateless.
 
94
# Then we can write another implementation that maps to http.
 
95
#
 
96
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
97
# abruptly drop the connection?
 
98
#
 
99
# TODO: Server in some cases will need to restrict access to files outside of
 
100
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
101
# ascending above the base directory, so we need to prevent paths
 
102
# containing '..' in either the server or transport layers.  (Also need to
 
103
# consider what happens if someone creates a symlink pointing outside the 
 
104
# directory tree...)
 
105
#
 
106
# TODO: Server should rebase absolute paths coming across the network to put
 
107
# them under the virtual root, if one is in use.  LocalTransport currently
 
108
# doesn't do that; if you give it an absolute path it just uses it.
 
109
 
110
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
111
# urlescape them instead.  Indeed possibly this should just literally be
 
112
# http-over-ssh.
 
113
#
 
114
# FIXME: This transport, with several others, has imperfect handling of paths
 
115
# within urls.  It'd probably be better for ".." from a root to raise an error
 
116
# rather than return the same directory as we do at present.
 
117
#
 
118
# TODO: Rather than working at the Transport layer we want a Branch,
 
119
# Repository or BzrDir objects that talk to a server.
 
120
#
 
121
# TODO: Probably want some way for server commands to gradually produce body
 
122
# data rather than passing it as a string; they could perhaps pass an
 
123
# iterator-like callback that will gradually yield data; it probably needs a
 
124
# close() method that will always be closed to do any necessary cleanup.
 
125
#
 
126
# TODO: Split the actual smart server from the ssh encoding of it.
 
127
#
 
128
# TODO: Perhaps support file-level readwrite operations over the transport
 
129
# too.
 
130
#
 
131
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
132
# branch doing file-level operations.
 
133
#
 
134
# TODO: jam 20060915 _decode_tuple is acting directly on input over
 
135
#       the socket, and it assumes everything is UTF8 sections separated
 
136
#       by \001. Which means a request like '\002' Will abort the connection
 
137
#       because of a UnicodeDecodeError. It does look like invalid data will
 
138
#       kill the SmartStreamServer, but only with an abort + exception, and 
 
139
#       the overall server shouldn't die.
24
140
 
25
141
from cStringIO import StringIO
 
142
import errno
 
143
import os
 
144
import socket
 
145
import sys
 
146
import tempfile
 
147
import threading
 
148
import urllib
 
149
import urlparse
26
150
 
27
151
from bzrlib import (
28
 
    config,
29
 
    debug,
 
152
    bzrdir,
30
153
    errors,
31
 
    remote,
 
154
    revision,
 
155
    transport,
32
156
    trace,
33
 
    transport,
34
157
    urlutils,
35
158
    )
36
 
from bzrlib.smart import client, medium
37
 
from bzrlib.symbol_versioning import (
38
 
    deprecated_method,
39
 
    )
40
 
 
41
 
 
42
 
class _SmartStat(object):
 
159
from bzrlib.bundle.serializer import write_bundle
 
160
from bzrlib.trace import mutter
 
161
from bzrlib.transport import local
 
162
 
 
163
# must do this otherwise urllib can't parse the urls properly :(
 
164
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
 
165
    transport.register_urlparse_netloc_protocol(scheme)
 
166
del scheme
 
167
 
 
168
 
 
169
def _recv_tuple(from_file):
 
170
    req_line = from_file.readline()
 
171
    return _decode_tuple(req_line)
 
172
 
 
173
 
 
174
def _decode_tuple(req_line):
 
175
    if req_line == None or req_line == '':
 
176
        return None
 
177
    if req_line[-1] != '\n':
 
178
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
179
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
180
 
 
181
 
 
182
def _encode_tuple(args):
 
183
    """Encode the tuple args to a bytestream."""
 
184
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
 
185
 
 
186
 
 
187
class SmartProtocolBase(object):
 
188
    """Methods common to client and server"""
 
189
 
 
190
    def _send_bulk_data(self, body):
 
191
        """Send chunked body data"""
 
192
        assert isinstance(body, str)
 
193
        bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
 
194
        self._write_and_flush(bytes)
 
195
 
 
196
    # TODO: this only actually accomodates a single block; possibly should support
 
197
    # multiple chunks?
 
198
    def _recv_bulk(self):
 
199
        chunk_len = self._in.readline()
 
200
        try:
 
201
            chunk_len = int(chunk_len)
 
202
        except ValueError:
 
203
            raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
 
204
        bulk = self._in.read(chunk_len)
 
205
        if len(bulk) != chunk_len:
 
206
            raise errors.SmartProtocolError("short read fetching bulk data chunk")
 
207
        self._recv_trailer()
 
208
        return bulk
 
209
 
 
210
    def _recv_tuple(self):
 
211
        return _recv_tuple(self._in)
 
212
 
 
213
    def _recv_trailer(self):
 
214
        resp = self._recv_tuple()
 
215
        if resp == ('done', ):
 
216
            return
 
217
        else:
 
218
            self._translate_error(resp)
 
219
 
 
220
    def _serialise_offsets(self, offsets):
 
221
        """Serialise a readv offset list."""
 
222
        txt = []
 
223
        for start, length in offsets:
 
224
            txt.append('%d,%d' % (start, length))
 
225
        return '\n'.join(txt)
 
226
 
 
227
    def _write_and_flush(self, bytes):
 
228
        """Write bytes to self._out and flush it."""
 
229
        # XXX: this will be inefficient.  Just ask Robert.
 
230
        self._out.write(bytes)
 
231
        self._out.flush()
 
232
 
 
233
 
 
234
class SmartStreamServer(SmartProtocolBase):
 
235
    """Handles smart commands coming over a stream.
 
236
 
 
237
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
238
    in-process fifo for testing.
 
239
 
 
240
    One instance is created for each connected client; it can serve multiple
 
241
    requests in the lifetime of the connection.
 
242
 
 
243
    The server passes requests through to an underlying backing transport, 
 
244
    which will typically be a LocalTransport looking at the server's filesystem.
 
245
    """
 
246
 
 
247
    def __init__(self, in_file, out_file, backing_transport):
 
248
        """Construct new server.
 
249
 
 
250
        :param in_file: Python file from which requests can be read.
 
251
        :param out_file: Python file to write responses.
 
252
        :param backing_transport: Transport for the directory served.
 
253
        """
 
254
        self._in = in_file
 
255
        self._out = out_file
 
256
        self.smart_server = SmartServer(backing_transport)
 
257
        # server can call back to us to get bulk data - this is not really
 
258
        # ideal, they should get it per request instead
 
259
        self.smart_server._recv_body = self._recv_bulk
 
260
 
 
261
    def _recv_tuple(self):
 
262
        """Read a request from the client and return as a tuple.
 
263
        
 
264
        Returns None at end of file (if the client closed the connection.)
 
265
        """
 
266
        return _recv_tuple(self._in)
 
267
 
 
268
    def _send_tuple(self, args):
 
269
        """Send response header"""
 
270
        return self._write_and_flush(_encode_tuple(args))
 
271
 
 
272
    def _send_error_and_disconnect(self, exception):
 
273
        self._send_tuple(('error', str(exception)))
 
274
        ## self._out.close()
 
275
        ## self._in.close()
 
276
 
 
277
    def _serve_one_request(self):
 
278
        """Read one request from input, process, send back a response.
 
279
        
 
280
        :return: False if the server should terminate, otherwise None.
 
281
        """
 
282
        req_args = self._recv_tuple()
 
283
        if req_args == None:
 
284
            # client closed connection
 
285
            return False  # shutdown server
 
286
        try:
 
287
            response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
 
288
            self._send_tuple(response.args)
 
289
            if response.body is not None:
 
290
                self._send_bulk_data(response.body)
 
291
        except KeyboardInterrupt:
 
292
            raise
 
293
        except Exception, e:
 
294
            # everything else: pass to client, flush, and quit
 
295
            self._send_error_and_disconnect(e)
 
296
            return False
 
297
 
 
298
    def serve(self):
 
299
        """Serve requests until the client disconnects."""
 
300
        # Keep a reference to stderr because the sys module's globals get set to
 
301
        # None during interpreter shutdown.
 
302
        from sys import stderr
 
303
        try:
 
304
            while self._serve_one_request() != False:
 
305
                pass
 
306
        except Exception, e:
 
307
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
308
            raise
 
309
 
 
310
 
 
311
class SmartServerResponse(object):
 
312
    """Response generated by SmartServer."""
 
313
 
 
314
    def __init__(self, args, body=None):
 
315
        self.args = args
 
316
        self.body = body
 
317
 
 
318
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
 
319
# for delivering the data for a request. This could be done with as the
 
320
# StreamServer, though that would create conflation between request and response
 
321
# which may be undesirable.
 
322
 
 
323
 
 
324
class SmartServer(object):
 
325
    """Protocol logic for smart server.
 
326
    
 
327
    This doesn't handle serialization at all, it just processes requests and
 
328
    creates responses.
 
329
    """
 
330
 
 
331
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
 
332
    # encoding or decoding logic to allow the wire protocol to vary from the
 
333
    # object protocol: we will want to tweak the wire protocol separate from
 
334
    # the object model, and ideally we will be able to do that without having
 
335
    # a SmartServer subclass for each wire protocol, rather just a Protocol
 
336
    # subclass.
 
337
 
 
338
    # TODO: Better way of representing the body for commands that take it,
 
339
    # and allow it to be streamed into the server.
 
340
    
 
341
    def __init__(self, backing_transport):
 
342
        self._backing_transport = backing_transport
 
343
        
 
344
    def do_hello(self):
 
345
        """Answer a version request with my version."""
 
346
        return SmartServerResponse(('ok', '1'))
 
347
 
 
348
    def do_has(self, relpath):
 
349
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
350
        return SmartServerResponse((r,))
 
351
 
 
352
    def do_get(self, relpath):
 
353
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
354
        return SmartServerResponse(('ok',), backing_bytes)
 
355
 
 
356
    def _deserialise_optional_mode(self, mode):
 
357
        # XXX: FIXME this should be on the protocol object.
 
358
        if mode == '':
 
359
            return None
 
360
        else:
 
361
            return int(mode)
 
362
 
 
363
    def do_append(self, relpath, mode):
 
364
        old_length = self._backing_transport.append_bytes(
 
365
            relpath, self._recv_body(), self._deserialise_optional_mode(mode))
 
366
        return SmartServerResponse(('appended', '%d' % old_length))
 
367
 
 
368
    def do_delete(self, relpath):
 
369
        self._backing_transport.delete(relpath)
 
370
 
 
371
    def do_iter_files_recursive(self, abspath):
 
372
        # XXX: the path handling needs some thought.
 
373
        #relpath = self._backing_transport.relpath(abspath)
 
374
        transport = self._backing_transport.clone(abspath)
 
375
        filenames = transport.iter_files_recursive()
 
376
        return SmartServerResponse(('names',) + tuple(filenames))
 
377
 
 
378
    def do_list_dir(self, relpath):
 
379
        filenames = self._backing_transport.list_dir(relpath)
 
380
        return SmartServerResponse(('names',) + tuple(filenames))
 
381
 
 
382
    def do_mkdir(self, relpath, mode):
 
383
        self._backing_transport.mkdir(relpath,
 
384
                                      self._deserialise_optional_mode(mode))
 
385
 
 
386
    def do_move(self, rel_from, rel_to):
 
387
        self._backing_transport.move(rel_from, rel_to)
 
388
 
 
389
    def do_put(self, relpath, mode):
 
390
        self._backing_transport.put_bytes(relpath,
 
391
                self._recv_body(),
 
392
                self._deserialise_optional_mode(mode))
 
393
 
 
394
    def _deserialise_offsets(self, text):
 
395
        # XXX: FIXME this should be on the protocol object.
 
396
        offsets = []
 
397
        for line in text.split('\n'):
 
398
            if not line:
 
399
                continue
 
400
            start, length = line.split(',')
 
401
            offsets.append((int(start), int(length)))
 
402
        return offsets
 
403
 
 
404
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
405
        create_parent_dir = (create_parent == 'T')
 
406
        self._backing_transport.put_bytes_non_atomic(relpath,
 
407
                self._recv_body(),
 
408
                mode=self._deserialise_optional_mode(mode),
 
409
                create_parent_dir=create_parent_dir,
 
410
                dir_mode=self._deserialise_optional_mode(dir_mode))
 
411
 
 
412
    def do_readv(self, relpath):
 
413
        offsets = self._deserialise_offsets(self._recv_body())
 
414
        backing_bytes = ''.join(bytes for offset, bytes in
 
415
                             self._backing_transport.readv(relpath, offsets))
 
416
        return SmartServerResponse(('readv',), backing_bytes)
 
417
        
 
418
    def do_rename(self, rel_from, rel_to):
 
419
        self._backing_transport.rename(rel_from, rel_to)
 
420
 
 
421
    def do_rmdir(self, relpath):
 
422
        self._backing_transport.rmdir(relpath)
 
423
 
 
424
    def do_stat(self, relpath):
 
425
        stat = self._backing_transport.stat(relpath)
 
426
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
427
        
 
428
    def do_get_bundle(self, path, revision_id):
 
429
        # open transport relative to our base
 
430
        t = self._backing_transport.clone(path)
 
431
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
432
        repo = control.open_repository()
 
433
        tmpf = tempfile.TemporaryFile()
 
434
        base_revision = revision.NULL_REVISION
 
435
        write_bundle(repo, revision_id, base_revision, tmpf)
 
436
        tmpf.seek(0)
 
437
        return SmartServerResponse((), tmpf.read())
 
438
 
 
439
    def dispatch_command(self, cmd, args):
 
440
        func = getattr(self, 'do_' + cmd, None)
 
441
        if func is None:
 
442
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
443
        try:
 
444
            result = func(*args)
 
445
            if result is None: 
 
446
                result = SmartServerResponse(('ok',))
 
447
            return result
 
448
        except errors.NoSuchFile, e:
 
449
            return SmartServerResponse(('NoSuchFile', e.path))
 
450
        except errors.FileExists, e:
 
451
            return SmartServerResponse(('FileExists', e.path))
 
452
        except errors.DirectoryNotEmpty, e:
 
453
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
454
        except errors.ShortReadvError, e:
 
455
            return SmartServerResponse(('ShortReadvError',
 
456
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
457
        except UnicodeError, e:
 
458
            # If it is a DecodeError, than most likely we are starting
 
459
            # with a plain string
 
460
            str_or_unicode = e.object
 
461
            if isinstance(str_or_unicode, unicode):
 
462
                val = u'u:' + str_or_unicode
 
463
            else:
 
464
                val = u's:' + str_or_unicode.encode('base64')
 
465
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
466
            return SmartServerResponse((e.__class__.__name__,
 
467
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
468
 
 
469
 
 
470
class SmartTCPServer(object):
 
471
    """Listens on a TCP socket and accepts connections from smart clients"""
 
472
 
 
473
    def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
 
474
        """Construct a new server.
 
475
 
 
476
        To actually start it running, call either start_background_thread or
 
477
        serve.
 
478
 
 
479
        :param host: Name of the interface to listen on.
 
480
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
481
        """
 
482
        if backing_transport is None:
 
483
            backing_transport = memory.MemoryTransport()
 
484
        self._server_socket = socket.socket()
 
485
        self._server_socket.bind((host, port))
 
486
        self.port = self._server_socket.getsockname()[1]
 
487
        self._server_socket.listen(1)
 
488
        self._server_socket.settimeout(1)
 
489
        self.backing_transport = backing_transport
 
490
 
 
491
    def serve(self):
 
492
        # let connections timeout so that we get a chance to terminate
 
493
        # Keep a reference to the exceptions we want to catch because the socket
 
494
        # module's globals get set to None during interpreter shutdown.
 
495
        from socket import timeout as socket_timeout
 
496
        from socket import error as socket_error
 
497
        self._should_terminate = False
 
498
        while not self._should_terminate:
 
499
            try:
 
500
                self.accept_and_serve()
 
501
            except socket_timeout:
 
502
                # just check if we're asked to stop
 
503
                pass
 
504
            except socket_error, e:
 
505
                trace.warning("client disconnected: %s", e)
 
506
                pass
 
507
 
 
508
    def get_url(self):
 
509
        """Return the url of the server"""
 
510
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
511
 
 
512
    def accept_and_serve(self):
 
513
        conn, client_addr = self._server_socket.accept()
 
514
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
515
        from_client = conn.makefile('r')
 
516
        to_client = conn.makefile('w')
 
517
        handler = SmartStreamServer(from_client, to_client,
 
518
                self.backing_transport)
 
519
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
520
        connection_thread.setDaemon(True)
 
521
        connection_thread.start()
 
522
 
 
523
    def start_background_thread(self):
 
524
        self._server_thread = threading.Thread(None,
 
525
                self.serve,
 
526
                name='server-' + self.get_url())
 
527
        self._server_thread.setDaemon(True)
 
528
        self._server_thread.start()
 
529
 
 
530
    def stop_background_thread(self):
 
531
        self._should_terminate = True
 
532
        # self._server_socket.close()
 
533
        # we used to join the thread, but it's not really necessary; it will
 
534
        # terminate in time
 
535
        ## self._server_thread.join()
 
536
 
 
537
 
 
538
class SmartTCPServer_for_testing(SmartTCPServer):
 
539
    """Server suitable for use by transport tests.
 
540
    
 
541
    This server is backed by the process's cwd.
 
542
    """
 
543
 
 
544
    def __init__(self):
 
545
        self._homedir = os.getcwd()
 
546
        # The server is set up by default like for ssh access: the client
 
547
        # passes filesystem-absolute paths; therefore the server must look
 
548
        # them up relative to the root directory.  it might be better to act
 
549
        # a public server and have the server rewrite paths into the test
 
550
        # directory.
 
551
        SmartTCPServer.__init__(self, transport.get_transport("file:///"))
 
552
        
 
553
    def setUp(self):
 
554
        """Set up server for testing"""
 
555
        self.start_background_thread()
 
556
 
 
557
    def tearDown(self):
 
558
        self.stop_background_thread()
 
559
 
 
560
    def get_url(self):
 
561
        """Return the url of the server"""
 
562
        host, port = self._server_socket.getsockname()
 
563
        # XXX: I think this is likely to break on windows -- self._homedir will
 
564
        # have backslashes (and maybe a drive letter?).
 
565
        #  -- Andrew Bennetts, 2006-08-29
 
566
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
567
 
 
568
    def get_bogus_url(self):
 
569
        """Return a URL which will fail to connect"""
 
570
        return 'bzr://127.0.0.1:1/'
 
571
 
 
572
 
 
573
class SmartStat(object):
43
574
 
44
575
    def __init__(self, size, mode):
45
576
        self.st_size = size
46
577
        self.st_mode = mode
47
578
 
48
579
 
49
 
class RemoteTransport(transport.ConnectedTransport):
 
580
class SmartTransport(transport.Transport):
50
581
    """Connection to a smart server.
51
582
 
52
 
    The connection holds references to the medium that can be used to send
53
 
    requests to the server.
 
583
    The connection holds references to pipes that can be used to send requests
 
584
    to the server.
54
585
 
55
586
    The connection has a notion of the current directory to which it's
56
587
    connected; this is incorporated in filenames passed to the server.
57
 
 
58
 
    This supports some higher-level RPC operations and can also be treated
 
588
    
 
589
    This supports some higher-level RPC operations and can also be treated 
59
590
    like a Transport to do file-like operations.
60
591
 
61
 
    The connection can be made over a tcp socket, an ssh pipe or a series of
62
 
    http requests.  There are concrete subclasses for each type:
63
 
    RemoteTCPTransport, etc.
 
592
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
593
    or a series of http requests.  There are concrete subclasses for each
 
594
    type: SmartTCPTransport, etc.
64
595
    """
65
596
 
66
 
    # When making a readv request, cap it at requesting 5MB of data
67
 
    _max_readv_bytes = 5*1024*1024
68
 
 
69
 
    # IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
 
597
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
70
598
    # responsibilities: Put those on SmartClient or similar. This is vital for
71
599
    # the ability to support multiple versions of the smart protocol over time:
72
 
    # RemoteTransport is an adapter from the Transport object model to the
 
600
    # SmartTransport is an adapter from the Transport object model to the 
73
601
    # SmartClient model, not an encoder.
74
602
 
75
 
    # FIXME: the medium parameter should be private, only the tests requires
76
 
    # it. It may be even clearer to define a TestRemoteTransport that handles
77
 
    # the specific cases of providing a _client and/or a _medium, and leave
78
 
    # RemoteTransport as an abstract class.
79
 
    def __init__(self, url, _from_transport=None, medium=None, _client=None):
 
603
    def __init__(self, url, clone_from=None, client=None):
80
604
        """Constructor.
81
605
 
82
 
        :param _from_transport: Another RemoteTransport instance that this
83
 
            one is being cloned from.  Attributes such as the medium will
84
 
            be reused.
85
 
 
86
 
        :param medium: The medium to use for this RemoteTransport.  If None,
87
 
            the medium from the _from_transport is shared.  If both this
88
 
            and _from_transport are None, a new medium will be built.
89
 
            _from_transport and medium cannot both be specified.
90
 
 
91
 
        :param _client: Override the _SmartClient used by this transport.  This
92
 
            should only be used for testing purposes; normally this is
93
 
            determined from the medium.
94
 
        """
95
 
        super(RemoteTransport, self).__init__(
96
 
            url, _from_transport=_from_transport)
97
 
 
98
 
        # The medium is the connection, except when we need to share it with
99
 
        # other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
 
        # what we want to share is really the shared connection.
101
 
 
102
 
        if (_from_transport is not None
103
 
            and isinstance(_from_transport, RemoteTransport)):
104
 
            _client = _from_transport._client
105
 
        elif _from_transport is None:
106
 
            # If no _from_transport is specified, we need to intialize the
107
 
            # shared medium.
108
 
            credentials = None
109
 
            if medium is None:
110
 
                medium, credentials = self._build_medium()
111
 
                if 'hpss' in debug.debug_flags:
112
 
                    trace.mutter('hpss: Built a new medium: %s',
113
 
                                 medium.__class__.__name__)
114
 
            self._shared_connection = transport._SharedConnection(medium,
115
 
                                                                  credentials,
116
 
                                                                  self.base)
117
 
        elif medium is None:
118
 
            # No medium was specified, so share the medium from the
119
 
            # _from_transport.
120
 
            medium = self._shared_connection.connection
121
 
        else:
122
 
            raise AssertionError(
123
 
                "Both _from_transport (%r) and medium (%r) passed to "
124
 
                "RemoteTransport.__init__, but these parameters are mutally "
125
 
                "exclusive." % (_from_transport, medium))
126
 
 
127
 
        if _client is None:
128
 
            self._client = client._SmartClient(medium)
129
 
        else:
130
 
            self._client = _client
131
 
 
132
 
    def _build_medium(self):
133
 
        """Create the medium if _from_transport does not provide one.
134
 
 
135
 
        The medium is analogous to the connection for ConnectedTransport: it
136
 
        allows connection sharing.
137
 
        """
138
 
        # No credentials
139
 
        return None, None
140
 
 
141
 
    def _report_activity(self, bytes, direction):
142
 
        """See Transport._report_activity.
143
 
 
144
 
        Does nothing; the smart medium will report activity triggered by a
145
 
        RemoteTransport.
146
 
        """
147
 
        pass
 
606
        :param client: ignored when clone_from is not None.
 
607
        """
 
608
        ### Technically super() here is faulty because Transport's __init__
 
609
        ### fails to take 2 parameters, and if super were to choose a silly
 
610
        ### initialisation order things would blow up. 
 
611
        if not url.endswith('/'):
 
612
            url += '/'
 
613
        super(SmartTransport, self).__init__(url)
 
614
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
615
                transport.split_url(url)
 
616
        if clone_from is None:
 
617
            if client is None:
 
618
                self._client = SmartStreamClient(self._connect_to_server)
 
619
            else:
 
620
                self._client = client
 
621
        else:
 
622
            # credentials may be stripped from the base in some circumstances
 
623
            # as yet to be clearly defined or documented, so copy them.
 
624
            self._username = clone_from._username
 
625
            # reuse same connection
 
626
            self._client = clone_from._client
 
627
 
 
628
    def abspath(self, relpath):
 
629
        """Return the full url to the given relative path.
 
630
        
 
631
        @param relpath: the relative path or path components
 
632
        @type relpath: str or list
 
633
        """
 
634
        return self._unparse_url(self._remote_path(relpath))
 
635
    
 
636
    def clone(self, relative_url):
 
637
        """Make a new SmartTransport related to me, sharing the same connection.
 
638
 
 
639
        This essentially opens a handle on a different remote directory.
 
640
        """
 
641
        if relative_url is None:
 
642
            return self.__class__(self.base, self)
 
643
        else:
 
644
            return self.__class__(self.abspath(relative_url), self)
148
645
 
149
646
    def is_readonly(self):
150
647
        """Smart server transport can do read/write file operations."""
151
 
        try:
152
 
            resp = self._call2('Transport.is_readonly')
153
 
        except errors.UnknownSmartMethod:
154
 
            # XXX: nasty hack: servers before 0.16 don't have a
155
 
            # 'Transport.is_readonly' verb, so we do what clients before 0.16
156
 
            # did: assume False.
157
 
            return False
158
 
        if resp == ('yes', ):
159
 
            return True
160
 
        elif resp == ('no', ):
161
 
            return False
162
 
        else:
163
 
            raise errors.UnexpectedSmartServerResponse(resp)
164
 
 
 
648
        return False
 
649
                                                   
165
650
    def get_smart_client(self):
166
 
        return self._get_connection()
 
651
        return self._client
 
652
                                                   
 
653
    def _unparse_url(self, path):
 
654
        """Return URL for a path.
167
655
 
168
 
    def get_smart_medium(self):
169
 
        return self._get_connection()
 
656
        :see: SFTPUrlHandling._unparse_url
 
657
        """
 
658
        # TODO: Eventually it should be possible to unify this with
 
659
        # SFTPUrlHandling._unparse_url?
 
660
        if path == '':
 
661
            path = '/'
 
662
        path = urllib.quote(path)
 
663
        netloc = urllib.quote(self._host)
 
664
        if self._username is not None:
 
665
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
666
        if self._port is not None:
 
667
            netloc = '%s:%d' % (netloc, self._port)
 
668
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
170
669
 
171
670
    def _remote_path(self, relpath):
172
671
        """Returns the Unicode version of the absolute path for relpath."""
173
672
        return self._combine_paths(self._path, relpath)
174
673
 
175
 
    def _call(self, method, *args):
176
 
        resp = self._call2(method, *args)
177
 
        self._ensure_ok(resp)
178
 
 
179
 
    def _call2(self, method, *args):
180
 
        """Call a method on the remote server."""
181
 
        try:
182
 
            return self._client.call(method, *args)
183
 
        except errors.ErrorFromSmartServer, err:
184
 
            # The first argument, if present, is always a path.
185
 
            if args:
186
 
                context = {'relpath': args[0]}
187
 
            else:
188
 
                context = {}
189
 
            self._translate_error(err, **context)
190
 
 
191
 
    def _call_with_body_bytes(self, method, args, body):
192
 
        """Call a method on the remote server with body bytes."""
193
 
        try:
194
 
            return self._client.call_with_body_bytes(method, args, body)
195
 
        except errors.ErrorFromSmartServer, err:
196
 
            # The first argument, if present, is always a path.
197
 
            if args:
198
 
                context = {'relpath': args[0]}
199
 
            else:
200
 
                context = {}
201
 
            self._translate_error(err, **context)
202
 
 
203
674
    def has(self, relpath):
204
675
        """Indicate whether a remote file of the given name exists or not.
205
676
 
206
677
        :see: Transport.has()
207
678
        """
208
 
        resp = self._call2('has', self._remote_path(relpath))
 
679
        resp = self._client._call('has', self._remote_path(relpath))
209
680
        if resp == ('yes', ):
210
681
            return True
211
682
        elif resp == ('no', ):
212
683
            return False
213
684
        else:
214
 
            raise errors.UnexpectedSmartServerResponse(resp)
 
685
            self._translate_error(resp)
215
686
 
216
687
    def get(self, relpath):
217
688
        """Return file-like object reading the contents of a remote file.
218
 
 
 
689
        
219
690
        :see: Transport.get_bytes()/get_file()
220
691
        """
221
 
        return StringIO(self.get_bytes(relpath))
222
 
 
223
 
    def get_bytes(self, relpath):
224
692
        remote = self._remote_path(relpath)
225
 
        try:
226
 
            resp, response_handler = self._client.call_expecting_body('get', remote)
227
 
        except errors.ErrorFromSmartServer, err:
228
 
            self._translate_error(err, relpath)
 
693
        resp = self._client._call('get', remote)
229
694
        if resp != ('ok', ):
230
 
            response_handler.cancel_read_body()
231
 
            raise errors.UnexpectedSmartServerResponse(resp)
232
 
        return response_handler.read_body_bytes()
 
695
            self._translate_error(resp, relpath)
 
696
        return StringIO(self._client._recv_bulk())
233
697
 
234
698
    def _serialise_optional_mode(self, mode):
235
699
        if mode is None:
238
702
            return '%d' % mode
239
703
 
240
704
    def mkdir(self, relpath, mode=None):
241
 
        resp = self._call2('mkdir', self._remote_path(relpath),
242
 
            self._serialise_optional_mode(mode))
243
 
 
244
 
    def open_write_stream(self, relpath, mode=None):
245
 
        """See Transport.open_write_stream."""
246
 
        self.put_bytes(relpath, "", mode)
247
 
        result = transport.AppendBasedFileStream(self, relpath)
248
 
        transport._file_streams[self.abspath(relpath)] = result
249
 
        return result
 
705
        resp = self._client._call('mkdir', 
 
706
                                  self._remote_path(relpath), 
 
707
                                  self._serialise_optional_mode(mode))
 
708
        self._translate_error(resp)
250
709
 
251
710
    def put_bytes(self, relpath, upload_contents, mode=None):
252
711
        # FIXME: upload_file is probably not safe for non-ascii characters -
253
712
        # should probably just pass all parameters as length-delimited
254
713
        # strings?
255
 
        if type(upload_contents) is unicode:
256
 
            # Although not strictly correct, we raise UnicodeEncodeError to be
257
 
            # compatible with other transports.
258
 
            raise UnicodeEncodeError(
259
 
                'undefined', upload_contents, 0, 1,
260
 
                'put_bytes must be given bytes, not unicode.')
261
 
        resp = self._call_with_body_bytes('put',
 
714
        resp = self._client._call_with_upload(
 
715
            'put',
262
716
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
263
717
            upload_contents)
264
 
        self._ensure_ok(resp)
265
 
        return len(upload_contents)
 
718
        self._translate_error(resp)
266
719
 
267
720
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
268
721
                             create_parent_dir=False,
273
726
        if create_parent_dir:
274
727
            create_parent_str = 'T'
275
728
 
276
 
        resp = self._call_with_body_bytes(
 
729
        resp = self._client._call_with_upload(
277
730
            'put_non_atomic',
278
731
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
279
732
             create_parent_str, self._serialise_optional_mode(dir_mode)),
280
733
            bytes)
281
 
        self._ensure_ok(resp)
 
734
        self._translate_error(resp)
282
735
 
283
736
    def put_file(self, relpath, upload_file, mode=None):
284
737
        # its not ideal to seek back, but currently put_non_atomic_file depends
300
753
 
301
754
    def append_file(self, relpath, from_file, mode=None):
302
755
        return self.append_bytes(relpath, from_file.read(), mode)
303
 
 
 
756
        
304
757
    def append_bytes(self, relpath, bytes, mode=None):
305
 
        resp = self._call_with_body_bytes(
 
758
        resp = self._client._call_with_upload(
306
759
            'append',
307
760
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
308
761
            bytes)
309
762
        if resp[0] == 'appended':
310
763
            return int(resp[1])
311
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
764
        self._translate_error(resp)
312
765
 
313
766
    def delete(self, relpath):
314
 
        resp = self._call2('delete', self._remote_path(relpath))
315
 
        self._ensure_ok(resp)
316
 
 
317
 
    def external_url(self):
318
 
        """See bzrlib.transport.Transport.external_url."""
319
 
        # the external path for RemoteTransports is the base
320
 
        return self.base
321
 
 
322
 
    def recommended_page_size(self):
323
 
        """Return the recommended page size for this transport."""
324
 
        return 64 * 1024
325
 
 
326
 
    def _readv(self, relpath, offsets):
 
767
        resp = self._client._call('delete', self._remote_path(relpath))
 
768
        self._translate_error(resp)
 
769
 
 
770
    def readv(self, relpath, offsets):
327
771
        if not offsets:
328
772
            return
329
773
 
330
774
        offsets = list(offsets)
331
775
 
332
776
        sorted_offsets = sorted(offsets)
 
777
        # turn the list of offsets into a stack
 
778
        offset_stack = iter(offsets)
 
779
        cur_offset_and_size = offset_stack.next()
333
780
        coalesced = list(self._coalesce_offsets(sorted_offsets,
334
781
                               limit=self._max_readv_combine,
335
 
                               fudge_factor=self._bytes_to_read_before_seek,
336
 
                               max_size=self._max_readv_bytes))
337
 
 
338
 
        # now that we've coallesced things, avoid making enormous requests
339
 
        requests = []
340
 
        cur_request = []
341
 
        cur_len = 0
342
 
        for c in coalesced:
343
 
            if c.length + cur_len > self._max_readv_bytes:
344
 
                requests.append(cur_request)
345
 
                cur_request = [c]
346
 
                cur_len = c.length
347
 
                continue
348
 
            cur_request.append(c)
349
 
            cur_len += c.length
350
 
        if cur_request:
351
 
            requests.append(cur_request)
352
 
        if 'hpss' in debug.debug_flags:
353
 
            trace.mutter('%s.readv %s offsets => %s coalesced'
354
 
                         ' => %s requests (%s)',
355
 
                         self.__class__.__name__, len(offsets), len(coalesced),
356
 
                         len(requests), sum(map(len, requests)))
 
782
                               fudge_factor=self._bytes_to_read_before_seek))
 
783
 
 
784
 
 
785
        resp = self._client._call_with_upload(
 
786
            'readv',
 
787
            (self._remote_path(relpath),),
 
788
            self._client._serialise_offsets((c.start, c.length) for c in coalesced))
 
789
 
 
790
        if resp[0] != 'readv':
 
791
            # This should raise an exception
 
792
            self._translate_error(resp)
 
793
            return
 
794
 
 
795
        data = self._client._recv_bulk()
357
796
        # Cache the results, but only until they have been fulfilled
358
797
        data_map = {}
359
 
        # turn the list of offsets into a single stack to iterate
360
 
        offset_stack = iter(offsets)
361
 
        # using a list so it can be modified when passing down and coming back
362
 
        next_offset = [offset_stack.next()]
363
 
        for cur_request in requests:
364
 
            try:
365
 
                result = self._client.call_with_body_readv_array(
366
 
                    ('readv', self._remote_path(relpath),),
367
 
                    [(c.start, c.length) for c in cur_request])
368
 
                resp, response_handler = result
369
 
            except errors.ErrorFromSmartServer, err:
370
 
                self._translate_error(err, relpath)
371
 
 
372
 
            if resp[0] != 'readv':
373
 
                # This should raise an exception
374
 
                response_handler.cancel_read_body()
375
 
                raise errors.UnexpectedSmartServerResponse(resp)
376
 
 
377
 
            for res in self._handle_response(offset_stack, cur_request,
378
 
                                             response_handler,
379
 
                                             data_map,
380
 
                                             next_offset):
381
 
                yield res
382
 
 
383
 
    def _handle_response(self, offset_stack, coalesced, response_handler,
384
 
                         data_map, next_offset):
385
 
        cur_offset_and_size = next_offset[0]
386
 
        # FIXME: this should know how many bytes are needed, for clarity.
387
 
        data = response_handler.read_body_bytes()
388
 
        data_offset = 0
389
798
        for c_offset in coalesced:
390
799
            if len(data) < c_offset.length:
391
800
                raise errors.ShortReadvError(relpath, c_offset.start,
392
801
                            c_offset.length, actual=len(data))
393
802
            for suboffset, subsize in c_offset.ranges:
394
803
                key = (c_offset.start+suboffset, subsize)
395
 
                this_data = data[data_offset+suboffset:
396
 
                                 data_offset+suboffset+subsize]
397
 
                # Special case when the data is in-order, rather than packing
398
 
                # into a map and then back out again. Benchmarking shows that
399
 
                # this has 100% hit rate, but leave in the data_map work just
400
 
                # in case.
401
 
                # TODO: Could we get away with using buffer() to avoid the
402
 
                #       memory copy?  Callers would need to realize they may
403
 
                #       not have a real string.
404
 
                if key == cur_offset_and_size:
405
 
                    yield cur_offset_and_size[0], this_data
406
 
                    cur_offset_and_size = next_offset[0] = offset_stack.next()
407
 
                else:
408
 
                    data_map[key] = this_data
409
 
            data_offset += c_offset.length
 
804
                data_map[key] = data[suboffset:suboffset+subsize]
 
805
            data = data[c_offset.length:]
410
806
 
411
807
            # Now that we've read some data, see if we can yield anything back
412
808
            while cur_offset_and_size in data_map:
413
809
                this_data = data_map.pop(cur_offset_and_size)
414
810
                yield cur_offset_and_size[0], this_data
415
 
                cur_offset_and_size = next_offset[0] = offset_stack.next()
 
811
                cur_offset_and_size = offset_stack.next()
416
812
 
417
813
    def rename(self, rel_from, rel_to):
418
 
        self._call('rename',
 
814
        self._call('rename', 
419
815
                   self._remote_path(rel_from),
420
816
                   self._remote_path(rel_to))
421
817
 
422
818
    def move(self, rel_from, rel_to):
423
 
        self._call('move',
 
819
        self._call('move', 
424
820
                   self._remote_path(rel_from),
425
821
                   self._remote_path(rel_to))
426
822
 
427
823
    def rmdir(self, relpath):
428
824
        resp = self._call('rmdir', self._remote_path(relpath))
429
825
 
430
 
    def _ensure_ok(self, resp):
431
 
        if resp[0] != 'ok':
432
 
            raise errors.UnexpectedSmartServerResponse(resp)
433
 
 
434
 
    def _translate_error(self, err, relpath=None):
435
 
        remote._translate_error(err, path=relpath)
 
826
    def _call(self, method, *args):
 
827
        resp = self._client._call(method, *args)
 
828
        self._translate_error(resp)
 
829
 
 
830
    def _translate_error(self, resp, orig_path=None):
 
831
        """Raise an exception from a response"""
 
832
        what = resp[0]
 
833
        if what == 'ok':
 
834
            return
 
835
        elif what == 'NoSuchFile':
 
836
            if orig_path is not None:
 
837
                error_path = orig_path
 
838
            else:
 
839
                error_path = resp[1]
 
840
            raise errors.NoSuchFile(error_path)
 
841
        elif what == 'error':
 
842
            raise errors.SmartProtocolError(unicode(resp[1]))
 
843
        elif what == 'FileExists':
 
844
            raise errors.FileExists(resp[1])
 
845
        elif what == 'DirectoryNotEmpty':
 
846
            raise errors.DirectoryNotEmpty(resp[1])
 
847
        elif what == 'ShortReadvError':
 
848
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
849
                                         int(resp[3]), int(resp[4]))
 
850
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
851
            encoding = str(resp[1]) # encoding must always be a string
 
852
            val = resp[2]
 
853
            start = int(resp[3])
 
854
            end = int(resp[4])
 
855
            reason = str(resp[5]) # reason must always be a string
 
856
            if val.startswith('u:'):
 
857
                val = val[2:]
 
858
            elif val.startswith('s:'):
 
859
                val = val[2:].decode('base64')
 
860
            if what == 'UnicodeDecodeError':
 
861
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
862
            elif what == 'UnicodeEncodeError':
 
863
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
864
        else:
 
865
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
866
 
 
867
    def _send_tuple(self, args):
 
868
        self._client._send_tuple(args)
 
869
 
 
870
    def _recv_tuple(self):
 
871
        return self._client._recv_tuple()
436
872
 
437
873
    def disconnect(self):
438
 
        self.get_smart_medium().disconnect()
 
874
        self._client.disconnect()
 
875
 
 
876
    def delete_tree(self, relpath):
 
877
        raise errors.TransportNotPossible('readonly transport')
439
878
 
440
879
    def stat(self, relpath):
441
 
        resp = self._call2('stat', self._remote_path(relpath))
 
880
        resp = self._client._call('stat', self._remote_path(relpath))
442
881
        if resp[0] == 'stat':
443
 
            return _SmartStat(int(resp[1]), int(resp[2], 8))
444
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
882
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
883
        else:
 
884
            self._translate_error(resp)
445
885
 
446
886
    ## def lock_read(self, relpath):
447
887
    ##     """Lock the given file for shared (read) access.
460
900
        return True
461
901
 
462
902
    def list_dir(self, relpath):
463
 
        resp = self._call2('list_dir', self._remote_path(relpath))
 
903
        resp = self._client._call('list_dir',
 
904
                                  self._remote_path(relpath))
464
905
        if resp[0] == 'names':
465
906
            return [name.encode('ascii') for name in resp[1:]]
466
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
907
        else:
 
908
            self._translate_error(resp)
467
909
 
468
910
    def iter_files_recursive(self):
469
 
        resp = self._call2('iter_files_recursive', self._remote_path(''))
 
911
        resp = self._client._call('iter_files_recursive',
 
912
                                  self._remote_path(''))
470
913
        if resp[0] == 'names':
471
914
            return resp[1:]
472
 
        raise errors.UnexpectedSmartServerResponse(resp)
473
 
 
474
 
 
475
 
class RemoteTCPTransport(RemoteTransport):
476
 
    """Connection to smart server over plain tcp.
477
 
 
478
 
    This is essentially just a factory to get 'RemoteTransport(url,
479
 
        SmartTCPClientMedium).
480
 
    """
481
 
 
482
 
    def _build_medium(self):
483
 
        client_medium = medium.SmartTCPClientMedium(
484
 
            self._host, self._port, self.base)
485
 
        return client_medium, None
486
 
 
487
 
 
488
 
class RemoteTCPTransportV2Only(RemoteTransport):
489
 
    """Connection to smart server over plain tcp with the client hard-coded to
490
 
    assume protocol v2 and remote server version <= 1.6.
491
 
 
492
 
    This should only be used for testing.
493
 
    """
494
 
 
495
 
    def _build_medium(self):
496
 
        client_medium = medium.SmartTCPClientMedium(
497
 
            self._host, self._port, self.base)
498
 
        client_medium._protocol_version = 2
499
 
        client_medium._remember_remote_is_before((1, 6))
500
 
        return client_medium, None
501
 
 
502
 
 
503
 
class RemoteSSHTransport(RemoteTransport):
504
 
    """Connection to smart server over SSH.
505
 
 
506
 
    This is essentially just a factory to get 'RemoteTransport(url,
507
 
        SmartSSHClientMedium).
508
 
    """
509
 
 
510
 
    def _build_medium(self):
511
 
        location_config = config.LocationConfig(self.base)
512
 
        bzr_remote_path = location_config.get_bzr_remote_path()
513
 
        user = self._user
514
 
        if user is None:
515
 
            auth = config.AuthenticationConfig()
516
 
            user = auth.get_user('ssh', self._host, self._port)
517
 
        client_medium = medium.SmartSSHClientMedium(self._host, self._port,
518
 
            user, self._password, self.base,
519
 
            bzr_remote_path=bzr_remote_path)
520
 
        return client_medium, (user, self._password)
521
 
 
522
 
 
523
 
class RemoteHTTPTransport(RemoteTransport):
524
 
    """Just a way to connect between a bzr+http:// url and http://.
525
 
 
526
 
    This connection operates slightly differently than the RemoteSSHTransport.
527
 
    It uses a plain http:// transport underneath, which defines what remote
528
 
    .bzr/smart URL we are connected to. From there, all paths that are sent are
529
 
    sent as relative paths, this way, the remote side can properly
530
 
    de-reference them, since it is likely doing rewrite rules to translate an
531
 
    HTTP path into a local path.
532
 
    """
533
 
 
534
 
    def __init__(self, base, _from_transport=None, http_transport=None):
535
 
        if http_transport is None:
536
 
            # FIXME: the password may be lost here because it appears in the
537
 
            # url only for an intial construction (when the url came from the
538
 
            # command-line).
539
 
            http_url = base[len('bzr+'):]
540
 
            self._http_transport = transport.get_transport(http_url)
541
915
        else:
542
 
            self._http_transport = http_transport
543
 
        super(RemoteHTTPTransport, self).__init__(
544
 
            base, _from_transport=_from_transport)
545
 
 
546
 
    def _build_medium(self):
547
 
        # We let http_transport take care of the credentials
548
 
        return self._http_transport.get_smart_medium(), None
549
 
 
550
 
    def _remote_path(self, relpath):
551
 
        """After connecting, HTTP Transport only deals in relative URLs."""
552
 
        # Adjust the relpath based on which URL this smart transport is
553
 
        # connected to.
554
 
        http_base = urlutils.normalize_url(self.get_smart_medium().base)
555
 
        url = urlutils.join(self.base[len('bzr+'):], relpath)
556
 
        url = urlutils.normalize_url(url)
557
 
        return urlutils.relative_url(http_base, url)
558
 
 
559
 
    def clone(self, relative_url):
560
 
        """Make a new RemoteHTTPTransport related to me.
561
 
 
562
 
        This is re-implemented rather than using the default
563
 
        RemoteTransport.clone() because we must be careful about the underlying
564
 
        http transport.
565
 
 
566
 
        Also, the cloned smart transport will POST to the same .bzr/smart
567
 
        location as this transport (although obviously the relative paths in the
568
 
        smart requests may be different).  This is so that the server doesn't
569
 
        have to handle .bzr/smart requests at arbitrary places inside .bzr
570
 
        directories, just at the initial URL the user uses.
 
916
            self._translate_error(resp)
 
917
 
 
918
 
 
919
class SmartStreamClient(SmartProtocolBase):
 
920
    """Connection to smart server over two streams"""
 
921
 
 
922
    def __init__(self, connect_func):
 
923
        self._connect_func = connect_func
 
924
        self._connected = False
 
925
 
 
926
    def __del__(self):
 
927
        self.disconnect()
 
928
 
 
929
    def _ensure_connection(self):
 
930
        if not self._connected:
 
931
            self._in, self._out = self._connect_func()
 
932
            self._connected = True
 
933
 
 
934
    def _send_tuple(self, args):
 
935
        self._ensure_connection()
 
936
        return self._write_and_flush(_encode_tuple(args))
 
937
 
 
938
    def _send_bulk_data(self, body):
 
939
        self._ensure_connection()
 
940
        SmartProtocolBase._send_bulk_data(self, body)
 
941
        
 
942
    def _recv_bulk(self):
 
943
        self._ensure_connection()
 
944
        return SmartProtocolBase._recv_bulk(self)
 
945
 
 
946
    def _recv_tuple(self):
 
947
        self._ensure_connection()
 
948
        return SmartProtocolBase._recv_tuple(self)
 
949
 
 
950
    def _recv_trailer(self):
 
951
        self._ensure_connection()
 
952
        return SmartProtocolBase._recv_trailer(self)
 
953
 
 
954
    def disconnect(self):
 
955
        """Close connection to the server"""
 
956
        if self._connected:
 
957
            self._out.close()
 
958
            self._in.close()
 
959
 
 
960
    def _call(self, *args):
 
961
        self._send_tuple(args)
 
962
        return self._recv_tuple()
 
963
 
 
964
    def _call_with_upload(self, method, args, body):
 
965
        """Call an rpc, supplying bulk upload data.
 
966
 
 
967
        :param method: method name to call
 
968
        :param args: parameter args tuple
 
969
        :param body: upload body as a byte string
571
970
        """
572
 
        if relative_url:
573
 
            abs_url = self.abspath(relative_url)
574
 
        else:
575
 
            abs_url = self.base
576
 
        return RemoteHTTPTransport(abs_url,
577
 
                                   _from_transport=self,
578
 
                                   http_transport=self._http_transport)
579
 
 
580
 
    def _redirected_to(self, source, target):
581
 
        """See transport._redirected_to"""
582
 
        redirected = self._http_transport._redirected_to(source, target)
583
 
        if (redirected is not None
584
 
            and isinstance(redirected, type(self._http_transport))):
585
 
            return RemoteHTTPTransport('bzr+' + redirected.external_url(),
586
 
                                       http_transport=redirected)
587
 
        else:
588
 
            # Either None or a transport for a different protocol
589
 
            return redirected
590
 
 
591
 
 
592
 
class HintingSSHTransport(transport.Transport):
593
 
    """Simple transport that handles ssh:// and points out bzr+ssh://."""
594
 
 
595
 
    def __init__(self, url):
596
 
        raise errors.UnsupportedProtocol(url,
597
 
            'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
 
971
        self._send_tuple((method,) + args)
 
972
        self._send_bulk_data(body)
 
973
        return self._recv_tuple()
 
974
 
 
975
    def query_version(self):
 
976
        """Return protocol version number of the server."""
 
977
        # XXX: should make sure it's empty
 
978
        self._send_tuple(('hello',))
 
979
        resp = self._recv_tuple()
 
980
        if resp == ('ok', '1'):
 
981
            return 1
 
982
        else:
 
983
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
984
 
 
985
 
 
986
class SmartTCPTransport(SmartTransport):
 
987
    """Connection to smart server over plain tcp"""
 
988
 
 
989
    def __init__(self, url, clone_from=None):
 
990
        super(SmartTCPTransport, self).__init__(url, clone_from)
 
991
        try:
 
992
            self._port = int(self._port)
 
993
        except (ValueError, TypeError), e:
 
994
            raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
 
995
        self._socket = None
 
996
 
 
997
    def _connect_to_server(self):
 
998
        self._socket = socket.socket()
 
999
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1000
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1001
        if result:
 
1002
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1003
                    (self._host, self._port, os.strerror(result)))
 
1004
        # TODO: May be more efficient to just treat them as sockets
 
1005
        # throughout?  But what about pipes to ssh?...
 
1006
        to_server = self._socket.makefile('w')
 
1007
        from_server = self._socket.makefile('r')
 
1008
        return from_server, to_server
 
1009
 
 
1010
    def disconnect(self):
 
1011
        super(SmartTCPTransport, self).disconnect()
 
1012
        # XXX: Is closing the socket as well as closing the files really
 
1013
        # necessary?
 
1014
        if self._socket is not None:
 
1015
            self._socket.close()
 
1016
 
 
1017
try:
 
1018
    from bzrlib.transport import sftp
 
1019
except errors.ParamikoNotPresent:
 
1020
    # no paramiko, no SSHTransport.
 
1021
    pass
 
1022
else:
 
1023
    class SmartSSHTransport(SmartTransport):
 
1024
        """Connection to smart server over SSH."""
 
1025
 
 
1026
        def __init__(self, url, clone_from=None):
 
1027
            # TODO: all this probably belongs in the parent class.
 
1028
            super(SmartSSHTransport, self).__init__(url, clone_from)
 
1029
            try:
 
1030
                if self._port is not None:
 
1031
                    self._port = int(self._port)
 
1032
            except (ValueError, TypeError), e:
 
1033
                raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
 
1034
 
 
1035
        def _connect_to_server(self):
 
1036
            # XXX: don't hardcode vendor
 
1037
            # XXX: cannot pass password to SSHSubprocess yet
 
1038
            if self._password is not None:
 
1039
                raise errors.InvalidURL("SSH smart transport doesn't handle passwords")
 
1040
            self._ssh_connection = sftp.SSHSubprocess(self._host, 'openssh',
 
1041
                    port=self._port, user=self._username,
 
1042
                    command=['bzr', 'serve', '--inet'])
 
1043
            return self._ssh_connection.get_filelike_channels()
 
1044
 
 
1045
        def disconnect(self):
 
1046
            super(SmartSSHTransport, self).disconnect()
 
1047
            self._ssh_connection.close()
598
1048
 
599
1049
 
600
1050
def get_test_permutations():
601
 
    """Return (transport, server) permutations for testing."""
602
 
    ### We may need a little more test framework support to construct an
603
 
    ### appropriate RemoteTransport in the future.
604
 
    from bzrlib.tests import test_server
605
 
    return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]
 
1051
    """Return (transport, server) permutations for testing"""
 
1052
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]