/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

Merge from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK+ TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
"""
 
50
 
 
51
 
 
52
 
 
53
# TODO: A plain integer from query_version is too simple; should give some
 
54
# capabilities too?
 
55
 
 
56
# TODO: Server should probably catch exceptions within itself and send them
 
57
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
58
# Also needs to somehow report protocol errors like bad requests.  Need to
 
59
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
60
# bulk transfer and then something goes wrong.
 
61
 
 
62
# TODO: Standard marker at start of request/response lines?
 
63
 
 
64
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
65
#
 
66
# TODO: get/put objects could be changed to gradually read back the data as it
 
67
# comes across the network
 
68
#
 
69
# TODO: What should the server do if it hits an error and has to terminate?
 
70
#
 
71
# TODO: is it useful to allow multiple chunks in the bulk data?
 
72
#
 
73
# TODO: If we get an exception during transmission of bulk data we can't just
 
74
# emit the exception because it won't be seen.
 
75
#   John proposes:  I think it would be worthwhile to have a header on each
 
76
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
77
#   chunk as long as you finish the previous chunk.
 
78
#
 
79
# TODO: Clone method on Transport; should work up towards parent directory;
 
80
# unclear how this should be stored or communicated to the server... maybe
 
81
# just pass it on all relevant requests?
 
82
#
 
83
# TODO: Better name than clone() for changing between directories.  How about
 
84
# open_dir or change_dir or chdir?
 
85
#
 
86
# TODO: Is it really good to have the notion of current directory within the
 
87
# connection?  Perhaps all Transports should factor out a common connection
 
88
# from the thing that has the directory context?
 
89
#
 
90
# TODO: Pull more things common to sftp and ssh to a higher level.
 
91
#
 
92
# TODO: The server that manages a connection should be quite small and retain
 
93
# minimum state because each of the requests are supposed to be stateless.
 
94
# Then we can write another implementation that maps to http.
 
95
#
 
96
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
97
# abruptly drop the connection?
 
98
#
 
99
# TODO: Server in some cases will need to restrict access to files outside of
 
100
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
101
# ascending above the base directory, so we need to prevent paths
 
102
# containing '..' in either the server or transport layers.  (Also need to
 
103
# consider what happens if someone creates a symlink pointing outside the 
 
104
# directory tree...)
 
105
#
 
106
# TODO: Server should rebase absolute paths coming across the network to put
 
107
# them under the virtual root, if one is in use.  LocalTransport currently
 
108
# doesn't do that; if you give it an absolute path it just uses it.
 
109
 
110
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
111
# urlescape them instead.  Indeed possibly this should just literally be
 
112
# http-over-ssh.
 
113
#
 
114
# FIXME: This transport, with several others, has imperfect handling of paths
 
115
# within urls.  It'd probably be better for ".." from a root to raise an error
 
116
# rather than return the same directory as we do at present.
 
117
#
 
118
# TODO: Rather than working at the Transport layer we want a Branch,
 
119
# Repository or BzrDir objects that talk to a server.
 
120
#
 
121
# TODO: Probably want some way for server commands to gradually produce body
 
122
# data rather than passing it as a string; they could perhaps pass an
 
123
# iterator-like callback that will gradually yield data; it probably needs a
 
124
# close() method that will always be closed to do any necessary cleanup.
 
125
#
 
126
# TODO: Split the actual smart server from the ssh encoding of it.
 
127
#
 
128
# TODO: Perhaps support file-level readwrite operations over the transport
 
129
# too.
 
130
#
 
131
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
132
# branch doing file-level operations.
 
133
#
 
134
# TODO: jam 20060915 _decode_tuple is acting directly on input over
 
135
#       the socket, and it assumes everything is UTF8 sections separated
 
136
#       by \001. Which means a request like '\002' Will abort the connection
 
137
#       because of a UnicodeDecodeError. It does look like invalid data will
 
138
#       kill the SmartStreamServer, but only with an abort + exception, and 
 
139
#       the overall server shouldn't die.
 
140
 
 
141
from cStringIO import StringIO
 
142
import errno
 
143
import os
 
144
import socket
 
145
import sys
 
146
import tempfile
 
147
import threading
 
148
import urllib
 
149
import urlparse
 
150
 
 
151
from bzrlib import (
 
152
    bzrdir,
 
153
    errors,
 
154
    revision,
 
155
    transport,
 
156
    trace,
 
157
    urlutils,
 
158
    )
 
159
from bzrlib.bundle.serializer import write_bundle
 
160
from bzrlib.trace import mutter
 
161
from bzrlib.transport import local
 
162
 
 
163
# must do this otherwise urllib can't parse the urls properly :(
 
164
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
 
165
    transport.register_urlparse_netloc_protocol(scheme)
 
166
del scheme
 
167
 
 
168
 
 
169
def _recv_tuple(from_file):
 
170
    req_line = from_file.readline()
 
171
    return _decode_tuple(req_line)
 
172
 
 
173
 
 
174
def _decode_tuple(req_line):
 
175
    if req_line == None or req_line == '':
 
176
        return None
 
177
    if req_line[-1] != '\n':
 
178
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
179
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
180
 
 
181
 
 
182
def _encode_tuple(args):
 
183
    """Encode the tuple args to a bytestream."""
 
184
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
 
185
 
 
186
 
 
187
class SmartProtocolBase(object):
 
188
    """Methods common to client and server"""
 
189
 
 
190
    def _send_bulk_data(self, body):
 
191
        """Send chunked body data"""
 
192
        assert isinstance(body, str)
 
193
        bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
 
194
        self._write_and_flush(bytes)
 
195
 
 
196
    # TODO: this only actually accomodates a single block; possibly should support
 
197
    # multiple chunks?
 
198
    def _recv_bulk(self):
 
199
        chunk_len = self._in.readline()
 
200
        try:
 
201
            chunk_len = int(chunk_len)
 
202
        except ValueError:
 
203
            raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
 
204
        bulk = self._in.read(chunk_len)
 
205
        if len(bulk) != chunk_len:
 
206
            raise errors.SmartProtocolError("short read fetching bulk data chunk")
 
207
        self._recv_trailer()
 
208
        return bulk
 
209
 
 
210
    def _recv_tuple(self):
 
211
        return _recv_tuple(self._in)
 
212
 
 
213
    def _recv_trailer(self):
 
214
        resp = self._recv_tuple()
 
215
        if resp == ('done', ):
 
216
            return
 
217
        else:
 
218
            self._translate_error(resp)
 
219
 
 
220
    def _serialise_offsets(self, offsets):
 
221
        """Serialise a readv offset list."""
 
222
        txt = []
 
223
        for start, length in offsets:
 
224
            txt.append('%d,%d' % (start, length))
 
225
        return '\n'.join(txt)
 
226
 
 
227
    def _write_and_flush(self, bytes):
 
228
        """Write bytes to self._out and flush it."""
 
229
        # XXX: this will be inefficient.  Just ask Robert.
 
230
        self._out.write(bytes)
 
231
        self._out.flush()
 
232
 
 
233
 
 
234
class SmartStreamServer(SmartProtocolBase):
 
235
    """Handles smart commands coming over a stream.
 
236
 
 
237
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
238
    in-process fifo for testing.
 
239
 
 
240
    One instance is created for each connected client; it can serve multiple
 
241
    requests in the lifetime of the connection.
 
242
 
 
243
    The server passes requests through to an underlying backing transport, 
 
244
    which will typically be a LocalTransport looking at the server's filesystem.
 
245
    """
 
246
 
 
247
    def __init__(self, in_file, out_file, backing_transport):
 
248
        """Construct new server.
 
249
 
 
250
        :param in_file: Python file from which requests can be read.
 
251
        :param out_file: Python file to write responses.
 
252
        :param backing_transport: Transport for the directory served.
 
253
        """
 
254
        self._in = in_file
 
255
        self._out = out_file
 
256
        self.smart_server = SmartServer(backing_transport)
 
257
        # server can call back to us to get bulk data - this is not really
 
258
        # ideal, they should get it per request instead
 
259
        self.smart_server._recv_body = self._recv_bulk
 
260
 
 
261
    def _recv_tuple(self):
 
262
        """Read a request from the client and return as a tuple.
 
263
        
 
264
        Returns None at end of file (if the client closed the connection.)
 
265
        """
 
266
        return _recv_tuple(self._in)
 
267
 
 
268
    def _send_tuple(self, args):
 
269
        """Send response header"""
 
270
        return self._write_and_flush(_encode_tuple(args))
 
271
 
 
272
    def _send_error_and_disconnect(self, exception):
 
273
        self._send_tuple(('error', str(exception)))
 
274
        ## self._out.close()
 
275
        ## self._in.close()
 
276
 
 
277
    def _serve_one_request(self):
 
278
        """Read one request from input, process, send back a response.
 
279
        
 
280
        :return: False if the server should terminate, otherwise None.
 
281
        """
 
282
        req_args = self._recv_tuple()
 
283
        mutter('server received %r' % (req_args,))
 
284
        if req_args == None:
 
285
            # client closed connection
 
286
            return False  # shutdown server
 
287
        try:
 
288
            response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
 
289
            mutter('server sending %r' % (response.args,))
 
290
            self._send_tuple(response.args)
 
291
            if response.body is not None:
 
292
                self._send_bulk_data(response.body)
 
293
        except KeyboardInterrupt:
 
294
            raise
 
295
        except Exception, e:
 
296
            # everything else: pass to client, flush, and quit
 
297
            self._send_error_and_disconnect(e)
 
298
            return False
 
299
 
 
300
    def serve(self):
 
301
        """Serve requests until the client disconnects."""
 
302
        # Keep a reference to stderr because the sys module's globals get set to
 
303
        # None during interpreter shutdown.
 
304
        from sys import stderr
 
305
        try:
 
306
            while self._serve_one_request() != False:
 
307
                pass
 
308
        except Exception, e:
 
309
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
310
            raise
 
311
 
 
312
 
 
313
class SmartServerResponse(object):
 
314
    """Response generated by SmartServer."""
 
315
 
 
316
    def __init__(self, args, body=None):
 
317
        self.args = args
 
318
        self.body = body
 
319
 
 
320
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
 
321
# for delivering the data for a request. This could be done with as the
 
322
# StreamServer, though that would create conflation between request and response
 
323
# which may be undesirable.
 
324
 
 
325
 
 
326
class SmartServer(object):
 
327
    """Protocol logic for smart server.
 
328
    
 
329
    This doesn't handle serialization at all, it just processes requests and
 
330
    creates responses.
 
331
    """
 
332
 
 
333
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
 
334
    # encoding or decoding logic to allow the wire protocol to vary from the
 
335
    # object protocol: we will want to tweak the wire protocol separate from
 
336
    # the object model, and ideally we will be able to do that without having
 
337
    # a SmartServer subclass for each wire protocol, rather just a Protocol
 
338
    # subclass.
 
339
 
 
340
    # TODO: Better way of representing the body for commands that take it,
 
341
    # and allow it to be streamed into the server.
 
342
    
 
343
    def __init__(self, backing_transport):
 
344
        self._backing_transport = backing_transport
 
345
        
 
346
    def do_hello(self):
 
347
        """Answer a version request with my version."""
 
348
        return SmartServerResponse(('ok', '1'))
 
349
 
 
350
    def do_has(self, relpath):
 
351
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
352
        return SmartServerResponse((r,))
 
353
 
 
354
    def do_get(self, relpath):
 
355
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
356
        return SmartServerResponse(('ok',), backing_bytes)
 
357
 
 
358
    def _deserialise_optional_mode(self, mode):
 
359
        # XXX: FIXME this should be on the protocol object.
 
360
        if mode == '':
 
361
            return None
 
362
        else:
 
363
            return int(mode)
 
364
 
 
365
    def do_append(self, relpath, mode):
 
366
        old_length = self._backing_transport.append_bytes(
 
367
            relpath, self._recv_body(), self._deserialise_optional_mode(mode))
 
368
        return SmartServerResponse(('appended', '%d' % old_length))
 
369
 
 
370
    def do_delete(self, relpath):
 
371
        self._backing_transport.delete(relpath)
 
372
 
 
373
    def do_iter_files_recursive(self, abspath):
 
374
        # XXX: the path handling needs some thought.
 
375
        #relpath = self._backing_transport.relpath(abspath)
 
376
        transport = self._backing_transport.clone(abspath)
 
377
        filenames = transport.iter_files_recursive()
 
378
        return SmartServerResponse(('names',) + tuple(filenames))
 
379
 
 
380
    def do_list_dir(self, relpath):
 
381
        filenames = self._backing_transport.list_dir(relpath)
 
382
        return SmartServerResponse(('names',) + tuple(filenames))
 
383
 
 
384
    def do_mkdir(self, relpath, mode):
 
385
        self._backing_transport.mkdir(relpath,
 
386
                                      self._deserialise_optional_mode(mode))
 
387
 
 
388
    def do_move(self, rel_from, rel_to):
 
389
        self._backing_transport.move(rel_from, rel_to)
 
390
 
 
391
    def do_put(self, relpath, mode):
 
392
        self._backing_transport.put_bytes(relpath,
 
393
                self._recv_body(),
 
394
                self._deserialise_optional_mode(mode))
 
395
 
 
396
    def _deserialise_offsets(self, text):
 
397
        # XXX: FIXME this should be on the protocol object.
 
398
        offsets = []
 
399
        for line in text.split('\n'):
 
400
            if not line:
 
401
                continue
 
402
            start, length = line.split(',')
 
403
            offsets.append((int(start), int(length)))
 
404
        return offsets
 
405
 
 
406
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
407
        create_parent_dir = (create_parent == 'T')
 
408
        self._backing_transport.put_bytes_non_atomic(relpath,
 
409
                self._recv_body(),
 
410
                mode=self._deserialise_optional_mode(mode),
 
411
                create_parent_dir=create_parent_dir,
 
412
                dir_mode=self._deserialise_optional_mode(dir_mode))
 
413
 
 
414
    def do_readv(self, relpath):
 
415
        offsets = self._deserialise_offsets(self._recv_body())
 
416
        backing_bytes = ''.join(bytes for offset, bytes in
 
417
                             self._backing_transport.readv(relpath, offsets))
 
418
        return SmartServerResponse(('readv',), backing_bytes)
 
419
        
 
420
    def do_rename(self, rel_from, rel_to):
 
421
        self._backing_transport.rename(rel_from, rel_to)
 
422
 
 
423
    def do_rmdir(self, relpath):
 
424
        self._backing_transport.rmdir(relpath)
 
425
 
 
426
    def do_stat(self, relpath):
 
427
        stat = self._backing_transport.stat(relpath)
 
428
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
429
        
 
430
    def do_get_bundle(self, path, revision_id):
 
431
        # open transport relative to our base
 
432
        t = self._backing_transport.clone(path)
 
433
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
434
        repo = control.open_repository()
 
435
        tmpf = tempfile.TemporaryFile()
 
436
        base_revision = revision.NULL_REVISION
 
437
        write_bundle(repo, revision_id, base_revision, tmpf)
 
438
        tmpf.seek(0)
 
439
        return SmartServerResponse((), tmpf.read())
 
440
 
 
441
    def dispatch_command(self, cmd, args):
 
442
        func = getattr(self, 'do_' + cmd, None)
 
443
        if func is None:
 
444
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
445
        try:
 
446
            result = func(*args)
 
447
            if result is None: 
 
448
                result = SmartServerResponse(('ok',))
 
449
            return result
 
450
        except errors.NoSuchFile, e:
 
451
            return SmartServerResponse(('NoSuchFile', e.path))
 
452
        except errors.FileExists, e:
 
453
            return SmartServerResponse(('FileExists', e.path))
 
454
        except errors.DirectoryNotEmpty, e:
 
455
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
456
        except errors.ShortReadvError, e:
 
457
            return SmartServerResponse(('ShortReadvError',
 
458
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
459
        except UnicodeError, e:
 
460
            # If it is a DecodeError, than most likely we are starting
 
461
            # with a plain string
 
462
            str_or_unicode = e.object
 
463
            if isinstance(str_or_unicode, unicode):
 
464
                val = u'u:' + str_or_unicode
 
465
            else:
 
466
                val = u's:' + str_or_unicode.encode('base64')
 
467
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
468
            return SmartServerResponse((e.__class__.__name__,
 
469
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
470
 
 
471
 
 
472
class SmartTCPServer(object):
 
473
    """Listens on a TCP socket and accepts connections from smart clients"""
 
474
 
 
475
    def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
 
476
        """Construct a new server.
 
477
 
 
478
        To actually start it running, call either start_background_thread or
 
479
        serve.
 
480
 
 
481
        :param host: Name of the interface to listen on.
 
482
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
483
        """
 
484
        if backing_transport is None:
 
485
            backing_transport = memory.MemoryTransport()
 
486
        self._server_socket = socket.socket()
 
487
        self._server_socket.bind((host, port))
 
488
        self.port = self._server_socket.getsockname()[1]
 
489
        self._server_socket.listen(1)
 
490
        self._server_socket.settimeout(1)
 
491
        self.backing_transport = backing_transport
 
492
 
 
493
    def serve(self):
 
494
        # let connections timeout so that we get a chance to terminate
 
495
        # Keep a reference to the exceptions we want to catch because the socket
 
496
        # module's globals get set to None during interpreter shutdown.
 
497
        from socket import timeout as socket_timeout
 
498
        from socket import error as socket_error
 
499
        self._should_terminate = False
 
500
        while not self._should_terminate:
 
501
            try:
 
502
                self.accept_and_serve()
 
503
            except socket_timeout:
 
504
                # just check if we're asked to stop
 
505
                pass
 
506
            except socket_error, e:
 
507
                trace.warning("client disconnected: %s", e)
 
508
                pass
 
509
 
 
510
    def get_url(self):
 
511
        """Return the url of the server"""
 
512
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
513
 
 
514
    def accept_and_serve(self):
 
515
        conn, client_addr = self._server_socket.accept()
 
516
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
517
        from_client = conn.makefile('r')
 
518
        to_client = conn.makefile('w')
 
519
        handler = SmartStreamServer(from_client, to_client,
 
520
                self.backing_transport)
 
521
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
522
        connection_thread.setDaemon(True)
 
523
        connection_thread.start()
 
524
 
 
525
    def start_background_thread(self):
 
526
        self._server_thread = threading.Thread(None,
 
527
                self.serve,
 
528
                name='server-' + self.get_url())
 
529
        self._server_thread.setDaemon(True)
 
530
        self._server_thread.start()
 
531
 
 
532
    def stop_background_thread(self):
 
533
        self._should_terminate = True
 
534
        # self._server_socket.close()
 
535
        # we used to join the thread, but it's not really necessary; it will
 
536
        # terminate in time
 
537
        ## self._server_thread.join()
 
538
 
 
539
 
 
540
class SmartTCPServer_for_testing(SmartTCPServer):
 
541
    """Server suitable for use by transport tests.
 
542
    
 
543
    This server is backed by the process's cwd.
 
544
    """
 
545
 
 
546
    def __init__(self):
 
547
        self._homedir = os.getcwd()
 
548
        # The server is set up by default like for ssh access: the client
 
549
        # passes filesystem-absolute paths; therefore the server must look
 
550
        # them up relative to the root directory.  it might be better to act
 
551
        # a public server and have the server rewrite paths into the test
 
552
        # directory.
 
553
        SmartTCPServer.__init__(self, transport.get_transport("file:///"))
 
554
        
 
555
    def setUp(self):
 
556
        """Set up server for testing"""
 
557
        self.start_background_thread()
 
558
 
 
559
    def tearDown(self):
 
560
        self.stop_background_thread()
 
561
 
 
562
    def get_url(self):
 
563
        """Return the url of the server"""
 
564
        host, port = self._server_socket.getsockname()
 
565
        # XXX: I think this is likely to break on windows -- self._homedir will
 
566
        # have backslashes (and maybe a drive letter?).
 
567
        #  -- Andrew Bennetts, 2006-08-29
 
568
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
569
 
 
570
    def get_bogus_url(self):
 
571
        """Return a URL which will fail to connect"""
 
572
        return 'bzr://127.0.0.1:1/'
 
573
 
 
574
 
 
575
class SmartStat(object):
 
576
 
 
577
    def __init__(self, size, mode):
 
578
        self.st_size = size
 
579
        self.st_mode = mode
 
580
 
 
581
 
 
582
class SmartTransport(transport.Transport):
 
583
    """Connection to a smart server.
 
584
 
 
585
    The connection holds references to pipes that can be used to send requests
 
586
    to the server.
 
587
 
 
588
    The connection has a notion of the current directory to which it's
 
589
    connected; this is incorporated in filenames passed to the server.
 
590
    
 
591
    This supports some higher-level RPC operations and can also be treated 
 
592
    like a Transport to do file-like operations.
 
593
 
 
594
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
595
    or a series of http requests.  There are concrete subclasses for each
 
596
    type: SmartTCPTransport, etc.
 
597
    """
 
598
 
 
599
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
600
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
601
    # the ability to support multiple versions of the smart protocol over time:
 
602
    # SmartTransport is an adapter from the Transport object model to the 
 
603
    # SmartClient model, not an encoder.
 
604
 
 
605
    def __init__(self, url, clone_from=None, client=None):
 
606
        """Constructor.
 
607
 
 
608
        :param client: ignored when clone_from is not None.
 
609
        """
 
610
        ### Technically super() here is faulty because Transport's __init__
 
611
        ### fails to take 2 parameters, and if super were to choose a silly
 
612
        ### initialisation order things would blow up. 
 
613
        if not url.endswith('/'):
 
614
            url += '/'
 
615
        super(SmartTransport, self).__init__(url)
 
616
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
617
                transport.split_url(url)
 
618
        if clone_from is None:
 
619
            if client is None:
 
620
                self._client = SmartStreamClient(self._connect_to_server)
 
621
            else:
 
622
                self._client = client
 
623
        else:
 
624
            # credentials may be stripped from the base in some circumstances
 
625
            # as yet to be clearly defined or documented, so copy them.
 
626
            self._username = clone_from._username
 
627
            # reuse same connection
 
628
            self._client = clone_from._client
 
629
 
 
630
    def abspath(self, relpath):
 
631
        """Return the full url to the given relative path.
 
632
        
 
633
        @param relpath: the relative path or path components
 
634
        @type relpath: str or list
 
635
        """
 
636
        return self._unparse_url(self._remote_path(relpath))
 
637
    
 
638
    def clone(self, relative_url):
 
639
        """Make a new SmartTransport related to me, sharing the same connection.
 
640
 
 
641
        This essentially opens a handle on a different remote directory.
 
642
        """
 
643
        if relative_url is None:
 
644
            return self.__class__(self.base, self)
 
645
        else:
 
646
            return self.__class__(self.abspath(relative_url), self)
 
647
 
 
648
    def is_readonly(self):
 
649
        """Smart server transport can do read/write file operations."""
 
650
        return False
 
651
                                                   
 
652
    def get_smart_client(self):
 
653
        return self._client
 
654
                                                   
 
655
    def _unparse_url(self, path):
 
656
        """Return URL for a path.
 
657
 
 
658
        :see: SFTPUrlHandling._unparse_url
 
659
        """
 
660
        # TODO: Eventually it should be possible to unify this with
 
661
        # SFTPUrlHandling._unparse_url?
 
662
        if path == '':
 
663
            path = '/'
 
664
        path = urllib.quote(path)
 
665
        netloc = urllib.quote(self._host)
 
666
        if self._username is not None:
 
667
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
668
        if self._port is not None:
 
669
            netloc = '%s:%d' % (netloc, self._port)
 
670
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
671
 
 
672
    def _remote_path(self, relpath):
 
673
        """Returns the Unicode version of the absolute path for relpath."""
 
674
        return self._combine_paths(self._path, relpath)
 
675
 
 
676
    def has(self, relpath):
 
677
        """Indicate whether a remote file of the given name exists or not.
 
678
 
 
679
        :see: Transport.has()
 
680
        """
 
681
        resp = self._client._call('has', self._remote_path(relpath))
 
682
        if resp == ('yes', ):
 
683
            return True
 
684
        elif resp == ('no', ):
 
685
            return False
 
686
        else:
 
687
            self._translate_error(resp)
 
688
 
 
689
    def get(self, relpath):
 
690
        """Return file-like object reading the contents of a remote file.
 
691
        
 
692
        :see: Transport.get_bytes()/get_file()
 
693
        """
 
694
        remote = self._remote_path(relpath)
 
695
        resp = self._client._call('get', remote)
 
696
        if resp != ('ok', ):
 
697
            self._translate_error(resp, relpath)
 
698
        return StringIO(self._client._recv_bulk())
 
699
 
 
700
    def _serialise_optional_mode(self, mode):
 
701
        if mode is None:
 
702
            return ''
 
703
        else:
 
704
            return '%d' % mode
 
705
 
 
706
    def mkdir(self, relpath, mode=None):
 
707
        resp = self._client._call('mkdir', 
 
708
                                  self._remote_path(relpath), 
 
709
                                  self._serialise_optional_mode(mode))
 
710
        self._translate_error(resp)
 
711
 
 
712
    def put_bytes(self, relpath, upload_contents, mode=None):
 
713
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
714
        # should probably just pass all parameters as length-delimited
 
715
        # strings?
 
716
        resp = self._client._call_with_upload(
 
717
            'put',
 
718
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
719
            upload_contents)
 
720
        self._translate_error(resp)
 
721
 
 
722
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
723
                             create_parent_dir=False,
 
724
                             dir_mode=None):
 
725
        """See Transport.put_bytes_non_atomic."""
 
726
        # FIXME: no encoding in the transport!
 
727
        create_parent_str = 'F'
 
728
        if create_parent_dir:
 
729
            create_parent_str = 'T'
 
730
 
 
731
        resp = self._client._call_with_upload(
 
732
            'put_non_atomic',
 
733
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
734
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
735
            bytes)
 
736
        self._translate_error(resp)
 
737
 
 
738
    def put_file(self, relpath, upload_file, mode=None):
 
739
        # its not ideal to seek back, but currently put_non_atomic_file depends
 
740
        # on transports not reading before failing - which is a faulty
 
741
        # assumption I think - RBC 20060915
 
742
        pos = upload_file.tell()
 
743
        try:
 
744
            return self.put_bytes(relpath, upload_file.read(), mode)
 
745
        except:
 
746
            upload_file.seek(pos)
 
747
            raise
 
748
 
 
749
    def put_file_non_atomic(self, relpath, f, mode=None,
 
750
                            create_parent_dir=False,
 
751
                            dir_mode=None):
 
752
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
753
                                         create_parent_dir=create_parent_dir,
 
754
                                         dir_mode=dir_mode)
 
755
 
 
756
    def append_file(self, relpath, from_file, mode=None):
 
757
        return self.append_bytes(relpath, from_file.read(), mode)
 
758
        
 
759
    def append_bytes(self, relpath, bytes, mode=None):
 
760
        resp = self._client._call_with_upload(
 
761
            'append',
 
762
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
763
            bytes)
 
764
        if resp[0] == 'appended':
 
765
            return int(resp[1])
 
766
        self._translate_error(resp)
 
767
 
 
768
    def delete(self, relpath):
 
769
        resp = self._client._call('delete', self._remote_path(relpath))
 
770
        self._translate_error(resp)
 
771
 
 
772
    def readv(self, relpath, offsets):
 
773
        if not offsets:
 
774
            return
 
775
 
 
776
        offsets = list(offsets)
 
777
 
 
778
        sorted_offsets = sorted(offsets)
 
779
        # turn the list of offsets into a stack
 
780
        offset_stack = iter(offsets)
 
781
        cur_offset_and_size = offset_stack.next()
 
782
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
783
                               limit=self._max_readv_combine,
 
784
                               fudge_factor=self._bytes_to_read_before_seek))
 
785
 
 
786
 
 
787
        resp = self._client._call_with_upload(
 
788
            'readv',
 
789
            (self._remote_path(relpath),),
 
790
            self._client._serialise_offsets((c.start, c.length) for c in coalesced))
 
791
 
 
792
        if resp[0] != 'readv':
 
793
            # This should raise an exception
 
794
            self._translate_error(resp)
 
795
            return
 
796
 
 
797
        data = self._client._recv_bulk()
 
798
        # Cache the results, but only until they have been fulfilled
 
799
        data_map = {}
 
800
        for c_offset in coalesced:
 
801
            if len(data) < c_offset.length:
 
802
                raise errors.ShortReadvError(relpath, c_offset.start,
 
803
                            c_offset.length, actual=len(data))
 
804
            for suboffset, subsize in c_offset.ranges:
 
805
                key = (c_offset.start+suboffset, subsize)
 
806
                data_map[key] = data[suboffset:suboffset+subsize]
 
807
            data = data[c_offset.length:]
 
808
 
 
809
            # Now that we've read some data, see if we can yield anything back
 
810
            while cur_offset_and_size in data_map:
 
811
                this_data = data_map.pop(cur_offset_and_size)
 
812
                yield cur_offset_and_size[0], this_data
 
813
                cur_offset_and_size = offset_stack.next()
 
814
 
 
815
    def rename(self, rel_from, rel_to):
 
816
        self._call('rename', 
 
817
                   self._remote_path(rel_from),
 
818
                   self._remote_path(rel_to))
 
819
 
 
820
    def move(self, rel_from, rel_to):
 
821
        self._call('move', 
 
822
                   self._remote_path(rel_from),
 
823
                   self._remote_path(rel_to))
 
824
 
 
825
    def rmdir(self, relpath):
 
826
        resp = self._call('rmdir', self._remote_path(relpath))
 
827
 
 
828
    def _call(self, method, *args):
 
829
        resp = self._client._call(method, *args)
 
830
        self._translate_error(resp)
 
831
 
 
832
    def _translate_error(self, resp, orig_path=None):
 
833
        """Raise an exception from a response"""
 
834
        if resp is None:
 
835
            what = None
 
836
        else:
 
837
            what = resp[0]
 
838
        if what == 'ok':
 
839
            return
 
840
        elif what == 'NoSuchFile':
 
841
            if orig_path is not None:
 
842
                error_path = orig_path
 
843
            else:
 
844
                error_path = resp[1]
 
845
            raise errors.NoSuchFile(error_path)
 
846
        elif what == 'error':
 
847
            raise errors.SmartProtocolError(unicode(resp[1]))
 
848
        elif what == 'FileExists':
 
849
            raise errors.FileExists(resp[1])
 
850
        elif what == 'DirectoryNotEmpty':
 
851
            raise errors.DirectoryNotEmpty(resp[1])
 
852
        elif what == 'ShortReadvError':
 
853
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
854
                                         int(resp[3]), int(resp[4]))
 
855
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
856
            encoding = str(resp[1]) # encoding must always be a string
 
857
            val = resp[2]
 
858
            start = int(resp[3])
 
859
            end = int(resp[4])
 
860
            reason = str(resp[5]) # reason must always be a string
 
861
            if val.startswith('u:'):
 
862
                val = val[2:]
 
863
            elif val.startswith('s:'):
 
864
                val = val[2:].decode('base64')
 
865
            if what == 'UnicodeDecodeError':
 
866
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
867
            elif what == 'UnicodeEncodeError':
 
868
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
869
        else:
 
870
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
871
 
 
872
    def _send_tuple(self, args):
 
873
        self._client._send_tuple(args)
 
874
 
 
875
    def _recv_tuple(self):
 
876
        return self._client._recv_tuple()
 
877
 
 
878
    def disconnect(self):
 
879
        self._client.disconnect()
 
880
 
 
881
    def delete_tree(self, relpath):
 
882
        raise errors.TransportNotPossible('readonly transport')
 
883
 
 
884
    def stat(self, relpath):
 
885
        resp = self._client._call('stat', self._remote_path(relpath))
 
886
        if resp[0] == 'stat':
 
887
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
888
        else:
 
889
            self._translate_error(resp)
 
890
 
 
891
    ## def lock_read(self, relpath):
 
892
    ##     """Lock the given file for shared (read) access.
 
893
    ##     :return: A lock object, which should be passed to Transport.unlock()
 
894
    ##     """
 
895
    ##     # The old RemoteBranch ignore lock for reading, so we will
 
896
    ##     # continue that tradition and return a bogus lock object.
 
897
    ##     class BogusLock(object):
 
898
    ##         def __init__(self, path):
 
899
    ##             self.path = path
 
900
    ##         def unlock(self):
 
901
    ##             pass
 
902
    ##     return BogusLock(relpath)
 
903
 
 
904
    def listable(self):
 
905
        return True
 
906
 
 
907
    def list_dir(self, relpath):
 
908
        resp = self._client._call('list_dir',
 
909
                                  self._remote_path(relpath))
 
910
        if resp[0] == 'names':
 
911
            return [name.encode('ascii') for name in resp[1:]]
 
912
        else:
 
913
            self._translate_error(resp)
 
914
 
 
915
    def iter_files_recursive(self):
 
916
        resp = self._client._call('iter_files_recursive',
 
917
                                  self._remote_path(''))
 
918
        if resp[0] == 'names':
 
919
            return resp[1:]
 
920
        else:
 
921
            self._translate_error(resp)
 
922
 
 
923
 
 
924
class SmartStreamClient(SmartProtocolBase):
 
925
    """Connection to smart server over two streams"""
 
926
 
 
927
    def __init__(self, connect_func):
 
928
        self._connect_func = connect_func
 
929
        self._connected = False
 
930
 
 
931
    def __del__(self):
 
932
        self.disconnect()
 
933
 
 
934
    def _ensure_connection(self):
 
935
        if not self._connected:
 
936
            self._in, self._out = self._connect_func()
 
937
            self._connected = True
 
938
 
 
939
    def _send_tuple(self, args):
 
940
        self._ensure_connection()
 
941
        return self._write_and_flush(_encode_tuple(args))
 
942
 
 
943
    def _send_bulk_data(self, body):
 
944
        self._ensure_connection()
 
945
        SmartProtocolBase._send_bulk_data(self, body)
 
946
        
 
947
    def _recv_bulk(self):
 
948
        self._ensure_connection()
 
949
        return SmartProtocolBase._recv_bulk(self)
 
950
 
 
951
    def _recv_tuple(self):
 
952
        self._ensure_connection()
 
953
        return SmartProtocolBase._recv_tuple(self)
 
954
 
 
955
    def _recv_trailer(self):
 
956
        self._ensure_connection()
 
957
        return SmartProtocolBase._recv_trailer(self)
 
958
 
 
959
    def disconnect(self):
 
960
        """Close connection to the server"""
 
961
        if self._connected:
 
962
            self._out.close()
 
963
            self._in.close()
 
964
 
 
965
    def _call(self, *args):
 
966
        self._send_tuple(args)
 
967
        return self._recv_tuple()
 
968
 
 
969
    def _call_with_upload(self, method, args, body):
 
970
        """Call an rpc, supplying bulk upload data.
 
971
 
 
972
        :param method: method name to call
 
973
        :param args: parameter args tuple
 
974
        :param body: upload body as a byte string
 
975
        """
 
976
        self._send_tuple((method,) + args)
 
977
        self._send_bulk_data(body)
 
978
        return self._recv_tuple()
 
979
 
 
980
    def query_version(self):
 
981
        """Return protocol version number of the server."""
 
982
        # XXX: should make sure it's empty
 
983
        self._send_tuple(('hello',))
 
984
        resp = self._recv_tuple()
 
985
        if resp == ('ok', '1'):
 
986
            return 1
 
987
        else:
 
988
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
989
 
 
990
 
 
991
class SmartTCPTransport(SmartTransport):
 
992
    """Connection to smart server over plain tcp"""
 
993
 
 
994
    def __init__(self, url, clone_from=None):
 
995
        super(SmartTCPTransport, self).__init__(url, clone_from)
 
996
        try:
 
997
            self._port = int(self._port)
 
998
        except (ValueError, TypeError), e:
 
999
            raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
 
1000
        self._socket = None
 
1001
 
 
1002
    def _connect_to_server(self):
 
1003
        self._socket = socket.socket()
 
1004
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1005
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1006
        if result:
 
1007
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1008
                    (self._host, self._port, os.strerror(result)))
 
1009
        # TODO: May be more efficient to just treat them as sockets
 
1010
        # throughout?  But what about pipes to ssh?...
 
1011
        to_server = self._socket.makefile('w')
 
1012
        from_server = self._socket.makefile('r')
 
1013
        return from_server, to_server
 
1014
 
 
1015
    def disconnect(self):
 
1016
        super(SmartTCPTransport, self).disconnect()
 
1017
        # XXX: Is closing the socket as well as closing the files really
 
1018
        # necessary?
 
1019
        if self._socket is not None:
 
1020
            self._socket.close()
 
1021
 
 
1022
try:
 
1023
    from bzrlib.transport import sftp, ssh
 
1024
except errors.ParamikoNotPresent:
 
1025
    # no paramiko, no SSHTransport.
 
1026
    pass
 
1027
else:
 
1028
    class SmartSSHTransport(SmartTransport):
 
1029
        """Connection to smart server over SSH."""
 
1030
 
 
1031
        def __init__(self, url, clone_from=None):
 
1032
            # TODO: all this probably belongs in the parent class.
 
1033
            super(SmartSSHTransport, self).__init__(url, clone_from)
 
1034
            try:
 
1035
                if self._port is not None:
 
1036
                    self._port = int(self._port)
 
1037
            except (ValueError, TypeError), e:
 
1038
                raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
 
1039
 
 
1040
        def _connect_to_server(self):
 
1041
            # XXX: don't hardcode vendor
 
1042
            # XXX: cannot pass password to SSHSubprocess yet
 
1043
            if self._password is not None:
 
1044
                raise errors.InvalidURL("SSH smart transport doesn't handle passwords")
 
1045
            executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1046
            vendor = ssh._get_ssh_vendor()
 
1047
            self._ssh_connection = vendor.connect_ssh(self._username, None,
 
1048
                    self._host, self._port,
 
1049
                    command=[executable, 'serve', '--inet',
 
1050
                            '--directory=/'])
 
1051
            return self._ssh_connection.get_filelike_channels()
 
1052
 
 
1053
        def disconnect(self):
 
1054
            super(SmartSSHTransport, self).disconnect()
 
1055
            self._ssh_connection.close()
 
1056
 
 
1057
 
 
1058
def get_test_permutations():
 
1059
    """Return (transport, server) permutations for testing"""
 
1060
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]