/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/remote.py

Commit current test pass improvements.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""RemoteTransport client for the smart-server.
18
18
 
20
20
imported from bzrlib.smart.
21
21
"""
22
22
 
23
 
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
 
23
__all__ = ['RemoteTransport', 'SmartTCPTransport', 'SmartSSHTransport']
24
24
 
25
25
from cStringIO import StringIO
 
26
import os
 
27
import socket
 
28
import sys
 
29
import tempfile
 
30
import threading
 
31
import urllib
 
32
import urlparse
26
33
 
27
34
from bzrlib import (
28
 
    config,
29
 
    debug,
 
35
    bzrdir,
30
36
    errors,
31
 
    remote,
 
37
    revision,
32
38
    trace,
33
39
    transport,
34
40
    urlutils,
35
41
    )
36
 
from bzrlib.smart import client, medium
37
 
from bzrlib.symbol_versioning import (
38
 
    deprecated_method,
39
 
    )
 
42
from bzrlib.bundle.serializer import write_bundle
 
43
from bzrlib.smart import client, medium, protocol
 
44
 
 
45
# must do this otherwise urllib can't parse the urls properly :(
 
46
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
 
47
    transport.register_urlparse_netloc_protocol(scheme)
 
48
del scheme
 
49
 
 
50
 
 
51
# Port 4155 is the default port for bzr://, registered with IANA.
 
52
BZR_DEFAULT_PORT = 4155
 
53
 
 
54
 
 
55
def _recv_tuple(from_file):
 
56
    req_line = from_file.readline()
 
57
    return _decode_tuple(req_line)
 
58
 
 
59
 
 
60
def _decode_tuple(req_line):
 
61
    if req_line == None or req_line == '':
 
62
        return None
 
63
    if req_line[-1] != '\n':
 
64
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
65
    return tuple(req_line[:-1].split('\x01'))
 
66
 
 
67
 
 
68
def _encode_tuple(args):
 
69
    """Encode the tuple args to a bytestream."""
 
70
    return '\x01'.join(args) + '\n'
 
71
 
 
72
 
 
73
class SmartProtocolBase(object):
 
74
    """Methods common to client and server"""
 
75
 
 
76
    # TODO: this only actually accomodates a single block; possibly should
 
77
    # support multiple chunks?
 
78
    def _encode_bulk_data(self, body):
 
79
        """Encode body as a bulk data chunk."""
 
80
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
81
 
 
82
    def _serialise_offsets(self, offsets):
 
83
        """Serialise a readv offset list."""
 
84
        txt = []
 
85
        for start, length in offsets:
 
86
            txt.append('%d,%d' % (start, length))
 
87
        return '\n'.join(txt)
 
88
        
 
89
 
 
90
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
91
    """Server-side encoding and decoding logic for smart version 1."""
 
92
    
 
93
    def __init__(self, backing_transport, write_func):
 
94
        self._backing_transport = backing_transport
 
95
        self.excess_buffer = ''
 
96
        self._finished = False
 
97
        self.in_buffer = ''
 
98
        self.has_dispatched = False
 
99
        self.request = None
 
100
        self._body_decoder = None
 
101
        self._write_func = write_func
 
102
 
 
103
    def accept_bytes(self, bytes):
 
104
        """Take bytes, and advance the internal state machine appropriately.
 
105
        
 
106
        :param bytes: must be a byte string
 
107
        """
 
108
        assert isinstance(bytes, str)
 
109
        self.in_buffer += bytes
 
110
        if not self.has_dispatched:
 
111
            if '\n' not in self.in_buffer:
 
112
                # no command line yet
 
113
                return
 
114
            self.has_dispatched = True
 
115
            try:
 
116
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
117
                first_line += '\n'
 
118
                req_args = _decode_tuple(first_line)
 
119
                self.request = SmartServerRequestHandler(
 
120
                    self._backing_transport)
 
121
                self.request.dispatch_command(req_args[0], req_args[1:])
 
122
                if self.request.finished_reading:
 
123
                    # trivial request
 
124
                    self.excess_buffer = self.in_buffer
 
125
                    self.in_buffer = ''
 
126
                    self._send_response(self.request.response.args,
 
127
                        self.request.response.body)
 
128
            except KeyboardInterrupt:
 
129
                raise
 
130
            except Exception, exception:
 
131
                # everything else: pass to client, flush, and quit
 
132
                self._send_response(('error', str(exception)))
 
133
                return
 
134
 
 
135
        if self.has_dispatched:
 
136
            if self._finished:
 
137
                # nothing to do.XXX: this routine should be a single state 
 
138
                # machine too.
 
139
                self.excess_buffer += self.in_buffer
 
140
                self.in_buffer = ''
 
141
                return
 
142
            if self._body_decoder is None:
 
143
                self._body_decoder = LengthPrefixedBodyDecoder()
 
144
            self._body_decoder.accept_bytes(self.in_buffer)
 
145
            self.in_buffer = self._body_decoder.unused_data
 
146
            body_data = self._body_decoder.read_pending_data()
 
147
            self.request.accept_body(body_data)
 
148
            if self._body_decoder.finished_reading:
 
149
                self.request.end_of_body()
 
150
                assert self.request.finished_reading, \
 
151
                    "no more body, request not finished"
 
152
            if self.request.response is not None:
 
153
                self._send_response(self.request.response.args,
 
154
                    self.request.response.body)
 
155
                self.excess_buffer = self.in_buffer
 
156
                self.in_buffer = ''
 
157
            else:
 
158
                assert not self.request.finished_reading, \
 
159
                    "no response and we have finished reading."
 
160
 
 
161
    def _send_response(self, args, body=None):
 
162
        """Send a smart server response down the output stream."""
 
163
        assert not self._finished, 'response already sent'
 
164
        self._finished = True
 
165
        self._write_func(_encode_tuple(args))
 
166
        if body is not None:
 
167
            assert isinstance(body, str), 'body must be a str'
 
168
            bytes = self._encode_bulk_data(body)
 
169
            self._write_func(bytes)
 
170
 
 
171
    def next_read_size(self):
 
172
        if self._finished:
 
173
            return 0
 
174
        if self._body_decoder is None:
 
175
            return 1
 
176
        else:
 
177
            return self._body_decoder.next_read_size()
 
178
 
 
179
 
 
180
class LengthPrefixedBodyDecoder(object):
 
181
    """Decodes the length-prefixed bulk data."""
 
182
    
 
183
    def __init__(self):
 
184
        self.bytes_left = None
 
185
        self.finished_reading = False
 
186
        self.unused_data = ''
 
187
        self.state_accept = self._state_accept_expecting_length
 
188
        self.state_read = self._state_read_no_data
 
189
        self._in_buffer = ''
 
190
        self._trailer_buffer = ''
 
191
    
 
192
    def accept_bytes(self, bytes):
 
193
        """Decode as much of bytes as possible.
 
194
 
 
195
        If 'bytes' contains too much data it will be appended to
 
196
        self.unused_data.
 
197
 
 
198
        finished_reading will be set when no more data is required.  Further
 
199
        data will be appended to self.unused_data.
 
200
        """
 
201
        # accept_bytes is allowed to change the state
 
202
        current_state = self.state_accept
 
203
        self.state_accept(bytes)
 
204
        while current_state != self.state_accept:
 
205
            current_state = self.state_accept
 
206
            self.state_accept('')
 
207
 
 
208
    def next_read_size(self):
 
209
        if self.bytes_left is not None:
 
210
            # Ideally we want to read all the remainder of the body and the
 
211
            # trailer in one go.
 
212
            return self.bytes_left + 5
 
213
        elif self.state_accept == self._state_accept_reading_trailer:
 
214
            # Just the trailer left
 
215
            return 5 - len(self._trailer_buffer)
 
216
        elif self.state_accept == self._state_accept_expecting_length:
 
217
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
218
            # 'done\n').
 
219
            return 6
 
220
        else:
 
221
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
222
            return 1
 
223
        
 
224
    def read_pending_data(self):
 
225
        """Return any pending data that has been decoded."""
 
226
        return self.state_read()
 
227
 
 
228
    def _state_accept_expecting_length(self, bytes):
 
229
        self._in_buffer += bytes
 
230
        pos = self._in_buffer.find('\n')
 
231
        if pos == -1:
 
232
            return
 
233
        self.bytes_left = int(self._in_buffer[:pos])
 
234
        self._in_buffer = self._in_buffer[pos+1:]
 
235
        self.bytes_left -= len(self._in_buffer)
 
236
        self.state_accept = self._state_accept_reading_body
 
237
        self.state_read = self._state_read_in_buffer
 
238
 
 
239
    def _state_accept_reading_body(self, bytes):
 
240
        self._in_buffer += bytes
 
241
        self.bytes_left -= len(bytes)
 
242
        if self.bytes_left <= 0:
 
243
            # Finished with body
 
244
            if self.bytes_left != 0:
 
245
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
246
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
247
            self.bytes_left = None
 
248
            self.state_accept = self._state_accept_reading_trailer
 
249
        
 
250
    def _state_accept_reading_trailer(self, bytes):
 
251
        self._trailer_buffer += bytes
 
252
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
253
        # a ProtocolViolation exception?
 
254
        if self._trailer_buffer.startswith('done\n'):
 
255
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
256
            self.state_accept = self._state_accept_reading_unused
 
257
            self.finished_reading = True
 
258
    
 
259
    def _state_accept_reading_unused(self, bytes):
 
260
        self.unused_data += bytes
 
261
 
 
262
    def _state_read_no_data(self):
 
263
        return ''
 
264
 
 
265
    def _state_read_in_buffer(self):
 
266
        result = self._in_buffer
 
267
        self._in_buffer = ''
 
268
        return result
 
269
 
 
270
 
 
271
class SmartServerStreamMedium(object):
 
272
    """Handles smart commands coming over a stream.
 
273
 
 
274
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
275
    in-process fifo for testing.
 
276
 
 
277
    One instance is created for each connected client; it can serve multiple
 
278
    requests in the lifetime of the connection.
 
279
 
 
280
    The server passes requests through to an underlying backing transport, 
 
281
    which will typically be a LocalTransport looking at the server's filesystem.
 
282
    """
 
283
 
 
284
    def __init__(self, backing_transport):
 
285
        """Construct new server.
 
286
 
 
287
        :param backing_transport: Transport for the directory served.
 
288
        """
 
289
        # backing_transport could be passed to serve instead of __init__
 
290
        self.backing_transport = backing_transport
 
291
        self.finished = False
 
292
 
 
293
    def serve(self):
 
294
        """Serve requests until the client disconnects."""
 
295
        # Keep a reference to stderr because the sys module's globals get set to
 
296
        # None during interpreter shutdown.
 
297
        from sys import stderr
 
298
        try:
 
299
            while not self.finished:
 
300
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
301
                                                         self._write_out)
 
302
                self._serve_one_request(protocol)
 
303
        except Exception, e:
 
304
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
305
            raise
 
306
 
 
307
    def _serve_one_request(self, protocol):
 
308
        """Read one request from input, process, send back a response.
 
309
        
 
310
        :param protocol: a SmartServerRequestProtocol.
 
311
        """
 
312
        try:
 
313
            self._serve_one_request_unguarded(protocol)
 
314
        except KeyboardInterrupt:
 
315
            raise
 
316
        except Exception, e:
 
317
            self.terminate_due_to_error()
 
318
 
 
319
    def terminate_due_to_error(self):
 
320
        """Called when an unhandled exception from the protocol occurs."""
 
321
        raise NotImplementedError(self.terminate_due_to_error)
 
322
 
 
323
 
 
324
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
325
 
 
326
    def __init__(self, sock, backing_transport):
 
327
        """Constructor.
 
328
 
 
329
        :param sock: the socket the server will read from.  It will be put
 
330
            into blocking mode.
 
331
        """
 
332
        SmartServerStreamMedium.__init__(self, backing_transport)
 
333
        self.push_back = ''
 
334
        sock.setblocking(True)
 
335
        self.socket = sock
 
336
 
 
337
    def _serve_one_request_unguarded(self, protocol):
 
338
        while protocol.next_read_size():
 
339
            if self.push_back:
 
340
                protocol.accept_bytes(self.push_back)
 
341
                self.push_back = ''
 
342
            else:
 
343
                bytes = self.socket.recv(4096)
 
344
                if bytes == '':
 
345
                    self.finished = True
 
346
                    return
 
347
                protocol.accept_bytes(bytes)
 
348
        
 
349
        self.push_back = protocol.excess_buffer
 
350
    
 
351
    def terminate_due_to_error(self):
 
352
        """Called when an unhandled exception from the protocol occurs."""
 
353
        # TODO: This should log to a server log file, but no such thing
 
354
        # exists yet.  Andrew Bennetts 2006-09-29.
 
355
        self.socket.close()
 
356
        self.finished = True
 
357
 
 
358
    def _write_out(self, bytes):
 
359
        self.socket.sendall(bytes)
 
360
 
 
361
 
 
362
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
363
 
 
364
    def __init__(self, in_file, out_file, backing_transport):
 
365
        """Construct new server.
 
366
 
 
367
        :param in_file: Python file from which requests can be read.
 
368
        :param out_file: Python file to write responses.
 
369
        :param backing_transport: Transport for the directory served.
 
370
        """
 
371
        SmartServerStreamMedium.__init__(self, backing_transport)
 
372
        if sys.platform == 'win32':
 
373
            # force binary mode for files
 
374
            import msvcrt
 
375
            for f in (in_file, out_file):
 
376
                fileno = getattr(f, 'fileno', None)
 
377
                if fileno:
 
378
                    msvcrt.setmode(fileno(), os.O_BINARY)
 
379
        self._in = in_file
 
380
        self._out = out_file
 
381
 
 
382
    def _serve_one_request_unguarded(self, protocol):
 
383
        while True:
 
384
            bytes_to_read = protocol.next_read_size()
 
385
            if bytes_to_read == 0:
 
386
                # Finished serving this request.
 
387
                self._out.flush()
 
388
                return
 
389
            bytes = self._in.read(bytes_to_read)
 
390
            if bytes == '':
 
391
                # Connection has been closed.
 
392
                self.finished = True
 
393
                self._out.flush()
 
394
                return
 
395
            protocol.accept_bytes(bytes)
 
396
 
 
397
    def terminate_due_to_error(self):
 
398
        # TODO: This should log to a server log file, but no such thing
 
399
        # exists yet.  Andrew Bennetts 2006-09-29.
 
400
        self._out.close()
 
401
        self.finished = True
 
402
 
 
403
    def _write_out(self, bytes):
 
404
        self._out.write(bytes)
 
405
 
 
406
 
 
407
class SmartServerResponse(object):
 
408
    """Response generated by SmartServerRequestHandler."""
 
409
 
 
410
    def __init__(self, args, body=None):
 
411
        self.args = args
 
412
        self.body = body
 
413
 
 
414
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
415
# for delivering the data for a request. This could be done with as the
 
416
# StreamServer, though that would create conflation between request and response
 
417
# which may be undesirable.
 
418
 
 
419
 
 
420
class SmartServerRequestHandler(object):
 
421
    """Protocol logic for smart server.
 
422
    
 
423
    This doesn't handle serialization at all, it just processes requests and
 
424
    creates responses.
 
425
    """
 
426
 
 
427
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
428
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
429
    # from the object protocol: we will want to tweak the wire protocol separate
 
430
    # from the object model, and ideally we will be able to do that without
 
431
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
432
    # just a Protocol subclass.
 
433
 
 
434
    # TODO: Better way of representing the body for commands that take it,
 
435
    # and allow it to be streamed into the server.
 
436
    
 
437
    def __init__(self, backing_transport):
 
438
        self._backing_transport = backing_transport
 
439
        self._converted_command = False
 
440
        self.finished_reading = False
 
441
        self._body_bytes = ''
 
442
        self.response = None
 
443
 
 
444
    def accept_body(self, bytes):
 
445
        """Accept body data.
 
446
 
 
447
        This should be overriden for each command that desired body data to
 
448
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
449
 
 
450
        The deserialisation into that format should be done in the Protocol
 
451
        object. Set self.desired_body_format to the format your method will
 
452
        handle.
 
453
        """
 
454
        # default fallback is to accumulate bytes.
 
455
        self._body_bytes += bytes
 
456
        
 
457
    def _end_of_body_handler(self):
 
458
        """An unimplemented end of body handler."""
 
459
        raise NotImplementedError(self._end_of_body_handler)
 
460
        
 
461
    def do_hello(self):
 
462
        """Answer a version request with my version."""
 
463
        return SmartServerResponse(('ok', '1'))
 
464
 
 
465
    def do_has(self, relpath):
 
466
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
467
        return SmartServerResponse((r,))
 
468
 
 
469
    def do_get(self, relpath):
 
470
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
471
        return SmartServerResponse(('ok',), backing_bytes)
 
472
 
 
473
    def _deserialise_optional_mode(self, mode):
 
474
        # XXX: FIXME this should be on the protocol object.
 
475
        if mode == '':
 
476
            return None
 
477
        else:
 
478
            return int(mode)
 
479
 
 
480
    def do_append(self, relpath, mode):
 
481
        self._converted_command = True
 
482
        self._relpath = relpath
 
483
        self._mode = self._deserialise_optional_mode(mode)
 
484
        self._end_of_body_handler = self._handle_do_append_end
 
485
    
 
486
    def _handle_do_append_end(self):
 
487
        old_length = self._backing_transport.append_bytes(
 
488
            self._relpath, self._body_bytes, self._mode)
 
489
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
490
 
 
491
    def do_delete(self, relpath):
 
492
        self._backing_transport.delete(relpath)
 
493
 
 
494
    def do_iter_files_recursive(self, relpath):
 
495
        transport = self._backing_transport.clone(relpath)
 
496
        filenames = transport.iter_files_recursive()
 
497
        return SmartServerResponse(('names',) + tuple(filenames))
 
498
 
 
499
    def do_list_dir(self, relpath):
 
500
        filenames = self._backing_transport.list_dir(relpath)
 
501
        return SmartServerResponse(('names',) + tuple(filenames))
 
502
 
 
503
    def do_mkdir(self, relpath, mode):
 
504
        self._backing_transport.mkdir(relpath,
 
505
                                      self._deserialise_optional_mode(mode))
 
506
 
 
507
    def do_move(self, rel_from, rel_to):
 
508
        self._backing_transport.move(rel_from, rel_to)
 
509
 
 
510
    def do_put(self, relpath, mode):
 
511
        self._converted_command = True
 
512
        self._relpath = relpath
 
513
        self._mode = self._deserialise_optional_mode(mode)
 
514
        self._end_of_body_handler = self._handle_do_put
 
515
 
 
516
    def _handle_do_put(self):
 
517
        self._backing_transport.put_bytes(self._relpath,
 
518
                self._body_bytes, self._mode)
 
519
        self.response = SmartServerResponse(('ok',))
 
520
 
 
521
    def _deserialise_offsets(self, text):
 
522
        # XXX: FIXME this should be on the protocol object.
 
523
        offsets = []
 
524
        for line in text.split('\n'):
 
525
            if not line:
 
526
                continue
 
527
            start, length = line.split(',')
 
528
            offsets.append((int(start), int(length)))
 
529
        return offsets
 
530
 
 
531
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
532
        self._converted_command = True
 
533
        self._end_of_body_handler = self._handle_put_non_atomic
 
534
        self._relpath = relpath
 
535
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
536
        self._mode = self._deserialise_optional_mode(mode)
 
537
        # a boolean would be nicer XXX
 
538
        self._create_parent = (create_parent == 'T')
 
539
 
 
540
    def _handle_put_non_atomic(self):
 
541
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
542
                self._body_bytes,
 
543
                mode=self._mode,
 
544
                create_parent_dir=self._create_parent,
 
545
                dir_mode=self._dir_mode)
 
546
        self.response = SmartServerResponse(('ok',))
 
547
 
 
548
    def do_readv(self, relpath):
 
549
        self._converted_command = True
 
550
        self._end_of_body_handler = self._handle_readv_offsets
 
551
        self._relpath = relpath
 
552
 
 
553
    def end_of_body(self):
 
554
        """No more body data will be received."""
 
555
        self._run_handler_code(self._end_of_body_handler, (), {})
 
556
        # cannot read after this.
 
557
        self.finished_reading = True
 
558
 
 
559
    def _handle_readv_offsets(self):
 
560
        """accept offsets for a readv request."""
 
561
        offsets = self._deserialise_offsets(self._body_bytes)
 
562
        backing_bytes = ''.join(bytes for offset, bytes in
 
563
            self._backing_transport.readv(self._relpath, offsets))
 
564
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
565
        
 
566
    def do_rename(self, rel_from, rel_to):
 
567
        self._backing_transport.rename(rel_from, rel_to)
 
568
 
 
569
    def do_rmdir(self, relpath):
 
570
        self._backing_transport.rmdir(relpath)
 
571
 
 
572
    def do_stat(self, relpath):
 
573
        stat = self._backing_transport.stat(relpath)
 
574
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
575
        
 
576
    def do_get_bundle(self, path, revision_id):
 
577
        # open transport relative to our base
 
578
        t = self._backing_transport.clone(path)
 
579
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
580
        repo = control.open_repository()
 
581
        tmpf = tempfile.TemporaryFile()
 
582
        base_revision = revision.NULL_REVISION
 
583
        write_bundle(repo, revision_id, base_revision, tmpf)
 
584
        tmpf.seek(0)
 
585
        return SmartServerResponse((), tmpf.read())
 
586
 
 
587
    def dispatch_command(self, cmd, args):
 
588
        """Deprecated compatibility method.""" # XXX XXX
 
589
        func = getattr(self, 'do_' + cmd, None)
 
590
        if func is None:
 
591
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
592
        self._run_handler_code(func, args, {})
 
593
 
 
594
    def _run_handler_code(self, callable, args, kwargs):
 
595
        """Run some handler specific code 'callable'.
 
596
 
 
597
        If a result is returned, it is considered to be the commands response,
 
598
        and finished_reading is set true, and its assigned to self.response.
 
599
 
 
600
        Any exceptions caught are translated and a response object created
 
601
        from them.
 
602
        """
 
603
        result = self._call_converting_errors(callable, args, kwargs)
 
604
        if result is not None:
 
605
            self.response = result
 
606
            self.finished_reading = True
 
607
        # handle unconverted commands
 
608
        if not self._converted_command:
 
609
            self.finished_reading = True
 
610
            if result is None:
 
611
                self.response = SmartServerResponse(('ok',))
 
612
 
 
613
    def _call_converting_errors(self, callable, args, kwargs):
 
614
        """Call callable converting errors to Response objects."""
 
615
        try:
 
616
            return callable(*args, **kwargs)
 
617
        except errors.NoSuchFile, e:
 
618
            return SmartServerResponse(('NoSuchFile', e.path))
 
619
        except errors.FileExists, e:
 
620
            return SmartServerResponse(('FileExists', e.path))
 
621
        except errors.DirectoryNotEmpty, e:
 
622
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
623
        except errors.ShortReadvError, e:
 
624
            return SmartServerResponse(('ShortReadvError',
 
625
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
626
        except UnicodeError, e:
 
627
            # If it is a DecodeError, than most likely we are starting
 
628
            # with a plain string
 
629
            str_or_unicode = e.object
 
630
            if isinstance(str_or_unicode, unicode):
 
631
                # XXX: UTF-8 might have \x01 (our seperator byte) in it.  We
 
632
                # should escape it somehow.
 
633
                val = 'u:' + str_or_unicode.encode('utf-8')
 
634
            else:
 
635
                val = 's:' + str_or_unicode.encode('base64')
 
636
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
637
            return SmartServerResponse((e.__class__.__name__,
 
638
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
639
        except errors.TransportNotPossible, e:
 
640
            if e.msg == "readonly transport":
 
641
                return SmartServerResponse(('ReadOnlyError', ))
 
642
            else:
 
643
                raise
 
644
 
 
645
 
 
646
class SmartTCPServer(object):
 
647
    """Listens on a TCP socket and accepts connections from smart clients"""
 
648
 
 
649
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
650
        """Construct a new server.
 
651
 
 
652
        To actually start it running, call either start_background_thread or
 
653
        serve.
 
654
 
 
655
        :param host: Name of the interface to listen on.
 
656
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
657
        """
 
658
        self._server_socket = socket.socket()
 
659
        self._server_socket.bind((host, port))
 
660
        self.port = self._server_socket.getsockname()[1]
 
661
        self._server_socket.listen(1)
 
662
        self._server_socket.settimeout(1)
 
663
        self.backing_transport = backing_transport
 
664
 
 
665
    def serve(self):
 
666
        # let connections timeout so that we get a chance to terminate
 
667
        # Keep a reference to the exceptions we want to catch because the socket
 
668
        # module's globals get set to None during interpreter shutdown.
 
669
        from socket import timeout as socket_timeout
 
670
        from socket import error as socket_error
 
671
        self._should_terminate = False
 
672
        while not self._should_terminate:
 
673
            try:
 
674
                self.accept_and_serve()
 
675
            except socket_timeout:
 
676
                # just check if we're asked to stop
 
677
                pass
 
678
            except socket_error, e:
 
679
                trace.warning("client disconnected: %s", e)
 
680
                pass
 
681
 
 
682
    def get_url(self):
 
683
        """Return the url of the server"""
 
684
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
685
 
 
686
    def accept_and_serve(self):
 
687
        conn, client_addr = self._server_socket.accept()
 
688
        # For WIN32, where the timeout value from the listening socket
 
689
        # propogates to the newly accepted socket.
 
690
        conn.setblocking(True)
 
691
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
692
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
 
693
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
694
        connection_thread.setDaemon(True)
 
695
        connection_thread.start()
 
696
 
 
697
    def start_background_thread(self):
 
698
        self._server_thread = threading.Thread(None,
 
699
                self.serve,
 
700
                name='server-' + self.get_url())
 
701
        self._server_thread.setDaemon(True)
 
702
        self._server_thread.start()
 
703
 
 
704
    def stop_background_thread(self):
 
705
        self._should_terminate = True
 
706
        # At one point we would wait to join the threads here, but it looks
 
707
        # like they don't actually exit.  So now we just leave them running
 
708
        # and expect to terminate the process. -- mbp 20070215
 
709
        # self._server_socket.close()
 
710
        ## sys.stderr.write("waiting for server thread to finish...")
 
711
        ## self._server_thread.join()
 
712
 
 
713
 
 
714
class SmartTCPServer_for_testing(SmartTCPServer):
 
715
    """Server suitable for use by transport tests.
 
716
    
 
717
    This server is backed by the process's cwd.
 
718
    """
 
719
 
 
720
    def __init__(self):
 
721
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
 
722
        # The server is set up by default like for ssh access: the client
 
723
        # passes filesystem-absolute paths; therefore the server must look
 
724
        # them up relative to the root directory.  it might be better to act
 
725
        # a public server and have the server rewrite paths into the test
 
726
        # directory.
 
727
        SmartTCPServer.__init__(self,
 
728
            transport.get_transport(urlutils.local_path_to_url('/')))
 
729
        
 
730
    def get_backing_transport(self, backing_transport_server):
 
731
        """Get a backing transport from a server we are decorating."""
 
732
        return transport.get_transport(backing_transport_server.get_url())
 
733
 
 
734
    def setUp(self, backing_transport_server=None):
 
735
        """Set up server for testing"""
 
736
        from bzrlib.transport.chroot import TestingChrootServer
 
737
        if backing_transport_server is None:
 
738
            from bzrlib.transport.local import LocalURLServer
 
739
            backing_transport_server = LocalURLServer()
 
740
        self.chroot_server = TestingChrootServer()
 
741
        self.chroot_server.setUp(backing_transport_server)
 
742
        self.backing_transport = transport.get_transport(
 
743
            self.chroot_server.get_url())
 
744
        self.start_background_thread()
 
745
 
 
746
    def tearDown(self):
 
747
        self.stop_background_thread()
 
748
 
 
749
    def get_bogus_url(self):
 
750
        """Return a URL which will fail to connect"""
 
751
        return 'bzr://127.0.0.1:1/'
40
752
 
41
753
 
42
754
class _SmartStat(object):
46
758
        self.st_mode = mode
47
759
 
48
760
 
49
 
class RemoteTransport(transport.ConnectedTransport):
 
761
class RemoteTransport(transport.Transport):
50
762
    """Connection to a smart server.
51
763
 
52
764
    The connection holds references to the medium that can be used to send
54
766
 
55
767
    The connection has a notion of the current directory to which it's
56
768
    connected; this is incorporated in filenames passed to the server.
57
 
 
58
 
    This supports some higher-level RPC operations and can also be treated
 
769
    
 
770
    This supports some higher-level RPC operations and can also be treated 
59
771
    like a Transport to do file-like operations.
60
772
 
61
773
    The connection can be made over a tcp socket, an ssh pipe or a series of
62
774
    http requests.  There are concrete subclasses for each type:
63
 
    RemoteTCPTransport, etc.
 
775
    SmartTCPTransport, etc.
64
776
    """
65
777
 
66
 
    # When making a readv request, cap it at requesting 5MB of data
67
 
    _max_readv_bytes = 5*1024*1024
68
 
 
69
778
    # IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
70
779
    # responsibilities: Put those on SmartClient or similar. This is vital for
71
780
    # the ability to support multiple versions of the smart protocol over time:
72
 
    # RemoteTransport is an adapter from the Transport object model to the
 
781
    # RemoteTransport is an adapter from the Transport object model to the 
73
782
    # SmartClient model, not an encoder.
74
783
 
75
 
    # FIXME: the medium parameter should be private, only the tests requires
76
 
    # it. It may be even clearer to define a TestRemoteTransport that handles
77
 
    # the specific cases of providing a _client and/or a _medium, and leave
78
 
    # RemoteTransport as an abstract class.
79
 
    def __init__(self, url, _from_transport=None, medium=None, _client=None):
 
784
    def __init__(self, url, clone_from=None, medium=None):
80
785
        """Constructor.
81
786
 
82
 
        :param _from_transport: Another RemoteTransport instance that this
83
 
            one is being cloned from.  Attributes such as the medium will
84
 
            be reused.
85
 
 
86
 
        :param medium: The medium to use for this RemoteTransport.  If None,
87
 
            the medium from the _from_transport is shared.  If both this
88
 
            and _from_transport are None, a new medium will be built.
89
 
            _from_transport and medium cannot both be specified.
90
 
 
91
 
        :param _client: Override the _SmartClient used by this transport.  This
92
 
            should only be used for testing purposes; normally this is
93
 
            determined from the medium.
94
 
        """
95
 
        super(RemoteTransport, self).__init__(
96
 
            url, _from_transport=_from_transport)
97
 
 
98
 
        # The medium is the connection, except when we need to share it with
99
 
        # other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
 
        # what we want to share is really the shared connection.
101
 
 
102
 
        if (_from_transport is not None
103
 
            and isinstance(_from_transport, RemoteTransport)):
104
 
            _client = _from_transport._client
105
 
        elif _from_transport is None:
106
 
            # If no _from_transport is specified, we need to intialize the
107
 
            # shared medium.
108
 
            credentials = None
109
 
            if medium is None:
110
 
                medium, credentials = self._build_medium()
111
 
                if 'hpss' in debug.debug_flags:
112
 
                    trace.mutter('hpss: Built a new medium: %s',
113
 
                                 medium.__class__.__name__)
114
 
            self._shared_connection = transport._SharedConnection(medium,
115
 
                                                                  credentials,
116
 
                                                                  self.base)
117
 
        elif medium is None:
118
 
            # No medium was specified, so share the medium from the
119
 
            # _from_transport.
120
 
            medium = self._shared_connection.connection
121
 
        else:
122
 
            raise AssertionError(
123
 
                "Both _from_transport (%r) and medium (%r) passed to "
124
 
                "RemoteTransport.__init__, but these parameters are mutally "
125
 
                "exclusive." % (_from_transport, medium))
126
 
 
127
 
        if _client is None:
128
 
            self._client = client._SmartClient(medium)
129
 
        else:
130
 
            self._client = _client
131
 
 
132
 
    def _build_medium(self):
133
 
        """Create the medium if _from_transport does not provide one.
134
 
 
135
 
        The medium is analogous to the connection for ConnectedTransport: it
136
 
        allows connection sharing.
137
 
        """
138
 
        # No credentials
139
 
        return None, None
140
 
 
141
 
    def _report_activity(self, bytes, direction):
142
 
        """See Transport._report_activity.
143
 
 
144
 
        Does nothing; the smart medium will report activity triggered by a
145
 
        RemoteTransport.
146
 
        """
147
 
        pass
 
787
        :param medium: The medium to use for this RemoteTransport. This must be
 
788
            supplied if clone_from is None.
 
789
        """
 
790
        ### Technically super() here is faulty because Transport's __init__
 
791
        ### fails to take 2 parameters, and if super were to choose a silly
 
792
        ### initialisation order things would blow up. 
 
793
        if not url.endswith('/'):
 
794
            url += '/'
 
795
        super(RemoteTransport, self).__init__(url)
 
796
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
797
                transport.split_url(url)
 
798
        if clone_from is None:
 
799
            self._medium = medium
 
800
        else:
 
801
            # credentials may be stripped from the base in some circumstances
 
802
            # as yet to be clearly defined or documented, so copy them.
 
803
            self._username = clone_from._username
 
804
            # reuse same connection
 
805
            self._medium = clone_from._medium
 
806
        assert self._medium is not None
 
807
 
 
808
    def abspath(self, relpath):
 
809
        """Return the full url to the given relative path.
 
810
        
 
811
        @param relpath: the relative path or path components
 
812
        @type relpath: str or list
 
813
        """
 
814
        return self._unparse_url(self._remote_path(relpath))
 
815
    
 
816
    def clone(self, relative_url):
 
817
        """Make a new RemoteTransport related to me, sharing the same connection.
 
818
 
 
819
        This essentially opens a handle on a different remote directory.
 
820
        """
 
821
        if relative_url is None:
 
822
            return RemoteTransport(self.base, self)
 
823
        else:
 
824
            return RemoteTransport(self.abspath(relative_url), self)
148
825
 
149
826
    def is_readonly(self):
150
827
        """Smart server transport can do read/write file operations."""
151
 
        try:
152
 
            resp = self._call2('Transport.is_readonly')
153
 
        except errors.UnknownSmartMethod:
154
 
            # XXX: nasty hack: servers before 0.16 don't have a
155
 
            # 'Transport.is_readonly' verb, so we do what clients before 0.16
156
 
            # did: assume False.
157
 
            return False
 
828
        resp = self._call2('Transport.is_readonly')
158
829
        if resp == ('yes', ):
159
830
            return True
160
831
        elif resp == ('no', ):
161
832
            return False
162
833
        else:
163
 
            raise errors.UnexpectedSmartServerResponse(resp)
 
834
            self._translate_error(resp)
 
835
        assert False, 'weird response %r' % (resp,)
164
836
 
165
837
    def get_smart_client(self):
166
 
        return self._get_connection()
 
838
        return self._medium
167
839
 
168
840
    def get_smart_medium(self):
169
 
        return self._get_connection()
 
841
        return self._medium
 
842
                                                   
 
843
    def _unparse_url(self, path):
 
844
        """Return URL for a path.
 
845
 
 
846
        :see: SFTPUrlHandling._unparse_url
 
847
        """
 
848
        # TODO: Eventually it should be possible to unify this with
 
849
        # SFTPUrlHandling._unparse_url?
 
850
        if path == '':
 
851
            path = '/'
 
852
        path = urllib.quote(path)
 
853
        netloc = urllib.quote(self._host)
 
854
        if self._username is not None:
 
855
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
856
        if self._port is not None:
 
857
            netloc = '%s:%d' % (netloc, self._port)
 
858
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
170
859
 
171
860
    def _remote_path(self, relpath):
172
861
        """Returns the Unicode version of the absolute path for relpath."""
174
863
 
175
864
    def _call(self, method, *args):
176
865
        resp = self._call2(method, *args)
177
 
        self._ensure_ok(resp)
 
866
        self._translate_error(resp)
178
867
 
179
868
    def _call2(self, method, *args):
180
869
        """Call a method on the remote server."""
181
 
        try:
182
 
            return self._client.call(method, *args)
183
 
        except errors.ErrorFromSmartServer, err:
184
 
            # The first argument, if present, is always a path.
185
 
            if args:
186
 
                context = {'relpath': args[0]}
187
 
            else:
188
 
                context = {}
189
 
            self._translate_error(err, **context)
 
870
        return client.SmartClient(self._medium).call(method, *args)
190
871
 
191
872
    def _call_with_body_bytes(self, method, args, body):
192
873
        """Call a method on the remote server with body bytes."""
193
 
        try:
194
 
            return self._client.call_with_body_bytes(method, args, body)
195
 
        except errors.ErrorFromSmartServer, err:
196
 
            # The first argument, if present, is always a path.
197
 
            if args:
198
 
                context = {'relpath': args[0]}
199
 
            else:
200
 
                context = {}
201
 
            self._translate_error(err, **context)
 
874
        smart_client = client.SmartClient(self._medium)
 
875
        return smart_client.call_with_body_bytes(method, args, body)
202
876
 
203
877
    def has(self, relpath):
204
878
        """Indicate whether a remote file of the given name exists or not.
211
885
        elif resp == ('no', ):
212
886
            return False
213
887
        else:
214
 
            raise errors.UnexpectedSmartServerResponse(resp)
 
888
            self._translate_error(resp)
215
889
 
216
890
    def get(self, relpath):
217
891
        """Return file-like object reading the contents of a remote file.
218
 
 
 
892
        
219
893
        :see: Transport.get_bytes()/get_file()
220
894
        """
221
895
        return StringIO(self.get_bytes(relpath))
222
896
 
223
897
    def get_bytes(self, relpath):
224
898
        remote = self._remote_path(relpath)
225
 
        try:
226
 
            resp, response_handler = self._client.call_expecting_body('get', remote)
227
 
        except errors.ErrorFromSmartServer, err:
228
 
            self._translate_error(err, relpath)
 
899
        request = self._medium.get_request()
 
900
        smart_protocol = protocol.SmartClientRequestProtocolOne(request)
 
901
        smart_protocol.call('get', remote)
 
902
        resp = smart_protocol.read_response_tuple(True)
229
903
        if resp != ('ok', ):
230
 
            response_handler.cancel_read_body()
231
 
            raise errors.UnexpectedSmartServerResponse(resp)
232
 
        return response_handler.read_body_bytes()
 
904
            smart_protocol.cancel_read_body()
 
905
            self._translate_error(resp, relpath)
 
906
        return smart_protocol.read_body_bytes()
233
907
 
234
908
    def _serialise_optional_mode(self, mode):
235
909
        if mode is None:
240
914
    def mkdir(self, relpath, mode=None):
241
915
        resp = self._call2('mkdir', self._remote_path(relpath),
242
916
            self._serialise_optional_mode(mode))
243
 
 
244
 
    def open_write_stream(self, relpath, mode=None):
245
 
        """See Transport.open_write_stream."""
246
 
        self.put_bytes(relpath, "", mode)
247
 
        result = transport.AppendBasedFileStream(self, relpath)
248
 
        transport._file_streams[self.abspath(relpath)] = result
249
 
        return result
 
917
        self._translate_error(resp)
250
918
 
251
919
    def put_bytes(self, relpath, upload_contents, mode=None):
252
920
        # FIXME: upload_file is probably not safe for non-ascii characters -
253
921
        # should probably just pass all parameters as length-delimited
254
922
        # strings?
255
 
        if type(upload_contents) is unicode:
256
 
            # Although not strictly correct, we raise UnicodeEncodeError to be
257
 
            # compatible with other transports.
258
 
            raise UnicodeEncodeError(
259
 
                'undefined', upload_contents, 0, 1,
260
 
                'put_bytes must be given bytes, not unicode.')
261
923
        resp = self._call_with_body_bytes('put',
262
924
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
263
925
            upload_contents)
264
 
        self._ensure_ok(resp)
265
 
        return len(upload_contents)
 
926
        self._translate_error(resp)
266
927
 
267
928
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
268
929
                             create_parent_dir=False,
278
939
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
279
940
             create_parent_str, self._serialise_optional_mode(dir_mode)),
280
941
            bytes)
281
 
        self._ensure_ok(resp)
 
942
        self._translate_error(resp)
282
943
 
283
944
    def put_file(self, relpath, upload_file, mode=None):
284
945
        # its not ideal to seek back, but currently put_non_atomic_file depends
300
961
 
301
962
    def append_file(self, relpath, from_file, mode=None):
302
963
        return self.append_bytes(relpath, from_file.read(), mode)
303
 
 
 
964
        
304
965
    def append_bytes(self, relpath, bytes, mode=None):
305
966
        resp = self._call_with_body_bytes(
306
967
            'append',
308
969
            bytes)
309
970
        if resp[0] == 'appended':
310
971
            return int(resp[1])
311
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
972
        self._translate_error(resp)
312
973
 
313
974
    def delete(self, relpath):
314
975
        resp = self._call2('delete', self._remote_path(relpath))
315
 
        self._ensure_ok(resp)
316
 
 
317
 
    def external_url(self):
318
 
        """See bzrlib.transport.Transport.external_url."""
319
 
        # the external path for RemoteTransports is the base
320
 
        return self.base
321
 
 
322
 
    def recommended_page_size(self):
323
 
        """Return the recommended page size for this transport."""
324
 
        return 64 * 1024
325
 
 
326
 
    def _readv(self, relpath, offsets):
 
976
        self._translate_error(resp)
 
977
 
 
978
    def readv(self, relpath, offsets):
327
979
        if not offsets:
328
980
            return
329
981
 
330
982
        offsets = list(offsets)
331
983
 
332
984
        sorted_offsets = sorted(offsets)
 
985
        # turn the list of offsets into a stack
 
986
        offset_stack = iter(offsets)
 
987
        cur_offset_and_size = offset_stack.next()
333
988
        coalesced = list(self._coalesce_offsets(sorted_offsets,
334
989
                               limit=self._max_readv_combine,
335
 
                               fudge_factor=self._bytes_to_read_before_seek,
336
 
                               max_size=self._max_readv_bytes))
337
 
 
338
 
        # now that we've coallesced things, avoid making enormous requests
339
 
        requests = []
340
 
        cur_request = []
341
 
        cur_len = 0
342
 
        for c in coalesced:
343
 
            if c.length + cur_len > self._max_readv_bytes:
344
 
                requests.append(cur_request)
345
 
                cur_request = [c]
346
 
                cur_len = c.length
347
 
                continue
348
 
            cur_request.append(c)
349
 
            cur_len += c.length
350
 
        if cur_request:
351
 
            requests.append(cur_request)
352
 
        if 'hpss' in debug.debug_flags:
353
 
            trace.mutter('%s.readv %s offsets => %s coalesced'
354
 
                         ' => %s requests (%s)',
355
 
                         self.__class__.__name__, len(offsets), len(coalesced),
356
 
                         len(requests), sum(map(len, requests)))
 
990
                               fudge_factor=self._bytes_to_read_before_seek))
 
991
 
 
992
        request = self._medium.get_request()
 
993
        smart_protocol = protocol.SmartClientRequestProtocolOne(request)
 
994
        smart_protocol.call_with_body_readv_array(
 
995
            ('readv', self._remote_path(relpath)),
 
996
            [(c.start, c.length) for c in coalesced])
 
997
        resp = smart_protocol.read_response_tuple(True)
 
998
 
 
999
        if resp[0] != 'readv':
 
1000
            # This should raise an exception
 
1001
            smart_protocol.cancel_read_body()
 
1002
            self._translate_error(resp)
 
1003
            return
 
1004
 
 
1005
        # FIXME: this should know how many bytes are needed, for clarity.
 
1006
        data = smart_protocol.read_body_bytes()
357
1007
        # Cache the results, but only until they have been fulfilled
358
1008
        data_map = {}
359
 
        # turn the list of offsets into a single stack to iterate
360
 
        offset_stack = iter(offsets)
361
 
        # using a list so it can be modified when passing down and coming back
362
 
        next_offset = [offset_stack.next()]
363
 
        for cur_request in requests:
364
 
            try:
365
 
                result = self._client.call_with_body_readv_array(
366
 
                    ('readv', self._remote_path(relpath),),
367
 
                    [(c.start, c.length) for c in cur_request])
368
 
                resp, response_handler = result
369
 
            except errors.ErrorFromSmartServer, err:
370
 
                self._translate_error(err, relpath)
371
 
 
372
 
            if resp[0] != 'readv':
373
 
                # This should raise an exception
374
 
                response_handler.cancel_read_body()
375
 
                raise errors.UnexpectedSmartServerResponse(resp)
376
 
 
377
 
            for res in self._handle_response(offset_stack, cur_request,
378
 
                                             response_handler,
379
 
                                             data_map,
380
 
                                             next_offset):
381
 
                yield res
382
 
 
383
 
    def _handle_response(self, offset_stack, coalesced, response_handler,
384
 
                         data_map, next_offset):
385
 
        cur_offset_and_size = next_offset[0]
386
 
        # FIXME: this should know how many bytes are needed, for clarity.
387
 
        data = response_handler.read_body_bytes()
388
 
        data_offset = 0
389
1009
        for c_offset in coalesced:
390
1010
            if len(data) < c_offset.length:
391
1011
                raise errors.ShortReadvError(relpath, c_offset.start,
392
1012
                            c_offset.length, actual=len(data))
393
1013
            for suboffset, subsize in c_offset.ranges:
394
1014
                key = (c_offset.start+suboffset, subsize)
395
 
                this_data = data[data_offset+suboffset:
396
 
                                 data_offset+suboffset+subsize]
397
 
                # Special case when the data is in-order, rather than packing
398
 
                # into a map and then back out again. Benchmarking shows that
399
 
                # this has 100% hit rate, but leave in the data_map work just
400
 
                # in case.
401
 
                # TODO: Could we get away with using buffer() to avoid the
402
 
                #       memory copy?  Callers would need to realize they may
403
 
                #       not have a real string.
404
 
                if key == cur_offset_and_size:
405
 
                    yield cur_offset_and_size[0], this_data
406
 
                    cur_offset_and_size = next_offset[0] = offset_stack.next()
407
 
                else:
408
 
                    data_map[key] = this_data
409
 
            data_offset += c_offset.length
 
1015
                data_map[key] = data[suboffset:suboffset+subsize]
 
1016
            data = data[c_offset.length:]
410
1017
 
411
1018
            # Now that we've read some data, see if we can yield anything back
412
1019
            while cur_offset_and_size in data_map:
413
1020
                this_data = data_map.pop(cur_offset_and_size)
414
1021
                yield cur_offset_and_size[0], this_data
415
 
                cur_offset_and_size = next_offset[0] = offset_stack.next()
 
1022
                cur_offset_and_size = offset_stack.next()
416
1023
 
417
1024
    def rename(self, rel_from, rel_to):
418
1025
        self._call('rename',
427
1034
    def rmdir(self, relpath):
428
1035
        resp = self._call('rmdir', self._remote_path(relpath))
429
1036
 
430
 
    def _ensure_ok(self, resp):
431
 
        if resp[0] != 'ok':
432
 
            raise errors.UnexpectedSmartServerResponse(resp)
433
 
 
434
 
    def _translate_error(self, err, relpath=None):
435
 
        remote._translate_error(err, path=relpath)
 
1037
    def _translate_error(self, resp, orig_path=None):
 
1038
        """Raise an exception from a response"""
 
1039
        if resp is None:
 
1040
            what = None
 
1041
        else:
 
1042
            what = resp[0]
 
1043
        if what == 'ok':
 
1044
            return
 
1045
        elif what == 'NoSuchFile':
 
1046
            if orig_path is not None:
 
1047
                error_path = orig_path
 
1048
            else:
 
1049
                error_path = resp[1]
 
1050
            raise errors.NoSuchFile(error_path)
 
1051
        elif what == 'error':
 
1052
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1053
        elif what == 'FileExists':
 
1054
            raise errors.FileExists(resp[1])
 
1055
        elif what == 'DirectoryNotEmpty':
 
1056
            raise errors.DirectoryNotEmpty(resp[1])
 
1057
        elif what == 'ShortReadvError':
 
1058
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1059
                                         int(resp[3]), int(resp[4]))
 
1060
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1061
            encoding = str(resp[1]) # encoding must always be a string
 
1062
            val = resp[2]
 
1063
            start = int(resp[3])
 
1064
            end = int(resp[4])
 
1065
            reason = str(resp[5]) # reason must always be a string
 
1066
            if val.startswith('u:'):
 
1067
                val = val[2:].decode('utf-8')
 
1068
            elif val.startswith('s:'):
 
1069
                val = val[2:].decode('base64')
 
1070
            if what == 'UnicodeDecodeError':
 
1071
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1072
            elif what == 'UnicodeEncodeError':
 
1073
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1074
        elif what == "ReadOnlyError":
 
1075
            raise errors.TransportNotPossible('readonly transport')
 
1076
        else:
 
1077
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
436
1078
 
437
1079
    def disconnect(self):
438
 
        self.get_smart_medium().disconnect()
 
1080
        self._medium.disconnect()
 
1081
 
 
1082
    def delete_tree(self, relpath):
 
1083
        raise errors.TransportNotPossible('readonly transport')
439
1084
 
440
1085
    def stat(self, relpath):
441
1086
        resp = self._call2('stat', self._remote_path(relpath))
442
1087
        if resp[0] == 'stat':
443
1088
            return _SmartStat(int(resp[1]), int(resp[2], 8))
444
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
1089
        else:
 
1090
            self._translate_error(resp)
445
1091
 
446
1092
    ## def lock_read(self, relpath):
447
1093
    ##     """Lock the given file for shared (read) access.
463
1109
        resp = self._call2('list_dir', self._remote_path(relpath))
464
1110
        if resp[0] == 'names':
465
1111
            return [name.encode('ascii') for name in resp[1:]]
466
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
1112
        else:
 
1113
            self._translate_error(resp)
467
1114
 
468
1115
    def iter_files_recursive(self):
469
1116
        resp = self._call2('iter_files_recursive', self._remote_path(''))
470
1117
        if resp[0] == 'names':
471
1118
            return resp[1:]
472
 
        raise errors.UnexpectedSmartServerResponse(resp)
473
 
 
474
 
 
475
 
class RemoteTCPTransport(RemoteTransport):
 
1119
        else:
 
1120
            self._translate_error(resp)
 
1121
 
 
1122
 
 
1123
class SmartTCPTransport(RemoteTransport):
476
1124
    """Connection to smart server over plain tcp.
477
 
 
 
1125
    
478
1126
    This is essentially just a factory to get 'RemoteTransport(url,
479
1127
        SmartTCPClientMedium).
480
1128
    """
481
1129
 
482
 
    def _build_medium(self):
483
 
        client_medium = medium.SmartTCPClientMedium(
484
 
            self._host, self._port, self.base)
485
 
        return client_medium, None
486
 
 
487
 
 
488
 
class RemoteTCPTransportV2Only(RemoteTransport):
489
 
    """Connection to smart server over plain tcp with the client hard-coded to
490
 
    assume protocol v2 and remote server version <= 1.6.
491
 
 
492
 
    This should only be used for testing.
493
 
    """
494
 
 
495
 
    def _build_medium(self):
496
 
        client_medium = medium.SmartTCPClientMedium(
497
 
            self._host, self._port, self.base)
498
 
        client_medium._protocol_version = 2
499
 
        client_medium._remember_remote_is_before((1, 6))
500
 
        return client_medium, None
501
 
 
502
 
 
503
 
class RemoteSSHTransport(RemoteTransport):
 
1130
    def __init__(self, url):
 
1131
        _scheme, _username, _password, _host, _port, _path = \
 
1132
            transport.split_url(url)
 
1133
        if _port is None:
 
1134
            _port = BZR_DEFAULT_PORT
 
1135
        else:
 
1136
            try:
 
1137
                _port = int(_port)
 
1138
            except (ValueError, TypeError), e:
 
1139
                raise errors.InvalidURL(
 
1140
                    path=url, extra="invalid port %s" % _port)
 
1141
        client_medium = medium.SmartTCPClientMedium(_host, _port)
 
1142
        super(SmartTCPTransport, self).__init__(url, medium=client_medium)
 
1143
 
 
1144
 
 
1145
class SmartSSHTransport(RemoteTransport):
504
1146
    """Connection to smart server over SSH.
505
1147
 
506
1148
    This is essentially just a factory to get 'RemoteTransport(url,
507
1149
        SmartSSHClientMedium).
508
1150
    """
509
1151
 
510
 
    def _build_medium(self):
511
 
        location_config = config.LocationConfig(self.base)
512
 
        bzr_remote_path = location_config.get_bzr_remote_path()
513
 
        user = self._user
514
 
        if user is None:
515
 
            auth = config.AuthenticationConfig()
516
 
            user = auth.get_user('ssh', self._host, self._port)
517
 
        client_medium = medium.SmartSSHClientMedium(self._host, self._port,
518
 
            user, self._password, self.base,
519
 
            bzr_remote_path=bzr_remote_path)
520
 
        return client_medium, (user, self._password)
521
 
 
522
 
 
523
 
class RemoteHTTPTransport(RemoteTransport):
 
1152
    def __init__(self, url):
 
1153
        _scheme, _username, _password, _host, _port, _path = \
 
1154
            transport.split_url(url)
 
1155
        try:
 
1156
            if _port is not None:
 
1157
                _port = int(_port)
 
1158
        except (ValueError, TypeError), e:
 
1159
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1160
                _port)
 
1161
        client_medium = medium.SmartSSHClientMedium(_host, _port,
 
1162
                                                    _username, _password)
 
1163
        super(SmartSSHTransport, self).__init__(url, medium=client_medium)
 
1164
 
 
1165
 
 
1166
class SmartHTTPTransport(RemoteTransport):
524
1167
    """Just a way to connect between a bzr+http:// url and http://.
525
 
 
526
 
    This connection operates slightly differently than the RemoteSSHTransport.
 
1168
    
 
1169
    This connection operates slightly differently than the SmartSSHTransport.
527
1170
    It uses a plain http:// transport underneath, which defines what remote
528
1171
    .bzr/smart URL we are connected to. From there, all paths that are sent are
529
1172
    sent as relative paths, this way, the remote side can properly
531
1174
    HTTP path into a local path.
532
1175
    """
533
1176
 
534
 
    def __init__(self, base, _from_transport=None, http_transport=None):
 
1177
    def __init__(self, url, http_transport=None):
 
1178
        assert url.startswith('bzr+http://')
 
1179
 
535
1180
        if http_transport is None:
536
 
            # FIXME: the password may be lost here because it appears in the
537
 
            # url only for an intial construction (when the url came from the
538
 
            # command-line).
539
 
            http_url = base[len('bzr+'):]
 
1181
            http_url = url[len('bzr+'):]
540
1182
            self._http_transport = transport.get_transport(http_url)
541
1183
        else:
542
1184
            self._http_transport = http_transport
543
 
        super(RemoteHTTPTransport, self).__init__(
544
 
            base, _from_transport=_from_transport)
545
 
 
546
 
    def _build_medium(self):
547
 
        # We let http_transport take care of the credentials
548
 
        return self._http_transport.get_smart_medium(), None
 
1185
        http_medium = self._http_transport.get_smart_medium()
 
1186
        super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
549
1187
 
550
1188
    def _remote_path(self, relpath):
551
 
        """After connecting, HTTP Transport only deals in relative URLs."""
552
 
        # Adjust the relpath based on which URL this smart transport is
553
 
        # connected to.
554
 
        http_base = urlutils.normalize_url(self.get_smart_medium().base)
555
 
        url = urlutils.join(self.base[len('bzr+'):], relpath)
556
 
        url = urlutils.normalize_url(url)
557
 
        return urlutils.relative_url(http_base, url)
 
1189
        """After connecting HTTP Transport only deals in relative URLs."""
 
1190
        if relpath == '.':
 
1191
            return ''
 
1192
        else:
 
1193
            return relpath
 
1194
 
 
1195
    def abspath(self, relpath):
 
1196
        """Return the full url to the given relative path.
 
1197
        
 
1198
        :param relpath: the relative path or path components
 
1199
        :type relpath: str or list
 
1200
        """
 
1201
        return self._unparse_url(self._combine_paths(self._path, relpath))
558
1202
 
559
1203
    def clone(self, relative_url):
560
 
        """Make a new RemoteHTTPTransport related to me.
 
1204
        """Make a new SmartHTTPTransport related to me.
561
1205
 
562
1206
        This is re-implemented rather than using the default
563
 
        RemoteTransport.clone() because we must be careful about the underlying
 
1207
        SmartTransport.clone() because we must be careful about the underlying
564
1208
        http transport.
565
 
 
566
 
        Also, the cloned smart transport will POST to the same .bzr/smart
567
 
        location as this transport (although obviously the relative paths in the
568
 
        smart requests may be different).  This is so that the server doesn't
569
 
        have to handle .bzr/smart requests at arbitrary places inside .bzr
570
 
        directories, just at the initial URL the user uses.
571
1209
        """
572
1210
        if relative_url:
573
1211
            abs_url = self.abspath(relative_url)
574
1212
        else:
575
1213
            abs_url = self.base
576
 
        return RemoteHTTPTransport(abs_url,
577
 
                                   _from_transport=self,
578
 
                                   http_transport=self._http_transport)
579
 
 
580
 
    def _redirected_to(self, source, target):
581
 
        """See transport._redirected_to"""
582
 
        redirected = self._http_transport._redirected_to(source, target)
583
 
        if (redirected is not None
584
 
            and isinstance(redirected, type(self._http_transport))):
585
 
            return RemoteHTTPTransport('bzr+' + redirected.external_url(),
586
 
                                       http_transport=redirected)
587
 
        else:
588
 
            # Either None or a transport for a different protocol
589
 
            return redirected
590
 
 
591
 
 
592
 
class HintingSSHTransport(transport.Transport):
593
 
    """Simple transport that handles ssh:// and points out bzr+ssh://."""
594
 
 
595
 
    def __init__(self, url):
596
 
        raise errors.UnsupportedProtocol(url,
597
 
            'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
 
1214
        # By cloning the underlying http_transport, we are able to share the
 
1215
        # connection.
 
1216
        new_transport = self._http_transport.clone(relative_url)
 
1217
        return SmartHTTPTransport(abs_url, http_transport=new_transport)
598
1218
 
599
1219
 
600
1220
def get_test_permutations():
601
1221
    """Return (transport, server) permutations for testing."""
602
1222
    ### We may need a little more test framework support to construct an
603
1223
    ### appropriate RemoteTransport in the future.
604
 
    from bzrlib.tests import test_server
605
 
    return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]
 
1224
    from bzrlib.smart import server
 
1225
    return [(SmartTCPTransport, server.SmartTCPServer_for_testing)]