/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

[broken] some support for write operations over hpss

Show diffs side-by-side

added added

removed removed

Lines of Context:
180
180
    to_file.flush()
181
181
 
182
182
 
183
 
# TODO: this only actually accomodates a single block; possibly should support
184
 
# multiple chunks?
185
 
def _recv_bulk(from_file):
186
 
    chunk_len = from_file.readline()
187
 
    try:
188
 
        chunk_len = int(chunk_len)
189
 
    except ValueError:
190
 
        raise BzrProtocolError("bad chunk length line %r" % chunk_len)
191
 
    bulk = from_file.read(chunk_len)
192
 
    if len(bulk) != chunk_len:
193
 
        raise BzrProtocolError("short read fetching bulk data chunk")
194
 
    return bulk
195
 
 
196
 
 
197
 
class SmartStreamServer(object):
 
183
class SmartProtocolBase(object):
 
184
    """Methods common to client and server"""
 
185
 
 
186
    def _send_bulk_data(self, body):
 
187
        """Send chunked body data"""
 
188
        assert isinstance(body, str)
 
189
        self._out.write('%d\n' % len(body))
 
190
        self._out.write(body)
 
191
        self._out.write('done\n')
 
192
        self._out.flush()
 
193
 
 
194
    # TODO: this only actually accomodates a single block; possibly should support
 
195
    # multiple chunks?
 
196
    def _recv_bulk(self):
 
197
        chunk_len = self._in.readline()
 
198
        try:
 
199
            chunk_len = int(chunk_len)
 
200
        except ValueError:
 
201
            raise BzrProtocolError("bad chunk length line %r" % chunk_len)
 
202
        bulk = self._in.read(chunk_len)
 
203
        if len(bulk) != chunk_len:
 
204
            raise BzrProtocolError("short read fetching bulk data chunk")
 
205
        self._recv_trailer()
 
206
        return bulk
 
207
 
 
208
    def _recv_tuple(self):
 
209
        return _recv_tuple(self._in)
 
210
 
 
211
    def _recv_trailer(self):
 
212
        resp = self._recv_tuple()
 
213
        if resp == ('done', ):
 
214
            return
 
215
        else:
 
216
            self._translate_error(resp)
 
217
 
 
218
 
 
219
class SmartStreamServer(SmartProtocolBase):
198
220
    """Handles smart commands coming over a stream.
199
221
 
200
222
    The stream may be a pipe connected to sshd, or a tcp socket, or an
217
239
        self._in = in_file
218
240
        self._out = out_file
219
241
        self.smart_server = SmartServer(backing_transport)
 
242
        # server can call back to us to get bulk data - this is not really
 
243
        # ideal, they should get it per request instead
 
244
        self.smart_server._recv_body = self._recv_bulk
220
245
 
221
246
    def _recv_tuple(self):
222
247
        """Read a request from the client and return as a tuple.
229
254
        """Send response header"""
230
255
        return _send_tuple(self._out, args)
231
256
 
232
 
    def _send_bulk_data(self, body):
233
 
        """Send chunked body data"""
234
 
        assert isinstance(body, str)
235
 
        self._out.write('%d\n' % len(body))
236
 
        self._out.write(body)
237
 
        self._out.write('done\n')
238
 
        self._out.flush()
239
 
 
240
257
    def _send_error_and_disconnect(self, exception):
241
258
        self._send_tuple(('error', str(exception)))
242
259
        self._out.flush()
243
 
        self._out.close()
244
 
        self._in.close()
 
260
        ## self._out.close()
 
261
        ## self._in.close()
245
262
 
246
263
    def _serve_one_request(self):
247
264
        """Read one request from input, process, send back a response.
259
276
                self._send_bulk_data(response.body)
260
277
        except errors.NoSuchFile, e:
261
278
            self._send_tuple(('enoent', e.path))
 
279
        except errors.FileExists, e:
 
280
            self._send_tuple(('FileExists', e.path))
262
281
        except KeyboardInterrupt:
263
282
            raise
264
283
        except Exception, e:
291
310
    creates responses.
292
311
    """
293
312
 
 
313
    # TODO: Better way of representing the body for commands that take it,
 
314
    # and allow it to be streamed into the server.
 
315
    
294
316
    def __init__(self, backing_transport):
295
317
        self._backing_transport = backing_transport
296
318
        
306
328
        backing_file = self._backing_transport.get(relpath)
307
329
        return SmartServerResponse(('ok',), backing_file.read())
308
330
 
 
331
    def _optional_mode(self, mode):
 
332
        if mode == '':
 
333
            return None
 
334
        else:
 
335
            return int(mode)
 
336
 
 
337
    def do_mkdir(self, relpath, mode):
 
338
        self._backing_transport.mkdir(relpath, self._optional_mode(mode))
 
339
        return SmartServerResponse(('ok',))
 
340
 
 
341
    def do_put(self, relpath, mode):
 
342
        self._backing_transport.put(relpath, 
 
343
                StringIO(self._recv_body()), 
 
344
                self._optional_mode(mode))
 
345
        return SmartServerResponse(('ok',))
 
346
 
 
347
    def do_append(self, relpath, mode):
 
348
        old_length = self._backing_transport.append(relpath, StringIO(self._recv_body()),
 
349
                self._optional_mode(mode))
 
350
        return SmartServerResponse(('appended', '%d' % old_length))
 
351
 
309
352
    def do_get_bundle(self, path, revision_id):
310
353
        # open transport relative to our base
311
354
        t = self._backing_transport.clone(path)
456
499
        return SmartTransport(new_url, clone_from=self)
457
500
 
458
501
    def is_readonly(self):
459
 
        """Smart protocol currently only supports readonly operations."""
460
 
        return True
461
 
 
462
 
    def get_smart_client(self):
463
 
        return self._client
464
 
    
465
 
    def _unparse_url(self, path):
466
 
        """Return URL for a path.
 
502
        """Smart server transport can do read/write file operations."""
 
503
        return False
 
504
                                                   
 
505
    def get_smart_client(self):                    
 
506
        return self._client                        
 
507
                                                   
 
508
    def _unparse_url(self, path):                  
 
509
        """Return URL for a path.                 l
467
510
 
468
511
        :see: SFTPUrlHandling._unparse_url
469
512
        """
487
530
 
488
531
        :see: Transport.has()
489
532
        """
490
 
        resp = self._call('has', self._remote_path(relpath))
 
533
        resp = self._client._call('has', self._remote_path(relpath))
491
534
        if resp == ('yes', ):
492
535
            return True
493
536
        elif resp == ('no', ):
503
546
        mutter("%s.get %s", self, relpath)
504
547
        remote = self._remote_path(relpath)
505
548
        mutter("  remote path: %s", remote)
506
 
        resp = self._call('get', remote)
 
549
        resp = self._client._call('get', remote)
507
550
        if resp != ('ok', ):
508
551
            self._translate_error(resp)
509
 
        body = self._recv_bulk()
510
 
        self._recv_trailer()
511
 
        ret = StringIO(body)
512
 
        ## print '  got %d bytes: %s' % (len(body), body[:30])
513
 
        return ret
 
552
        return StringIO(self._client._recv_bulk())
514
553
 
515
 
    def _recv_trailer(self):
516
 
        resp = self._recv_tuple()
517
 
        if resp == ('done', ):
518
 
            return
 
554
    def _optional_mode(self, mode):
 
555
        if mode is None:
 
556
            return ''
519
557
        else:
520
 
            self._translate_error(resp)
521
 
 
522
 
    def _call(self, *args):
523
 
        self._send_tuple(args)
524
 
        return self._recv_tuple()
 
558
            return '%d' % mode
 
559
 
 
560
    def mkdir(self, relpath, mode=None):
 
561
        resp = self._client._call('mkdir', 
 
562
                                  self._remote_path(relpath), 
 
563
                                  self._optional_mode(mode))
 
564
        self._translate_error(resp)
 
565
 
 
566
    def put(self, relpath, upload_file, mode=None):
 
567
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
568
        # should probably just pass all parameters as length-delimited
 
569
        # strings?
 
570
        resp = self._client._call_with_upload('put', 
 
571
                                              (self._remote_path(relpath), 
 
572
                                               self._optional_mode(mode)),
 
573
                                              upload_file.read())
 
574
        self._translate_error(resp)
 
575
 
 
576
    def append(self, relpath, from_file, mode=None):
 
577
        resp = self._client._call_with_upload('append',
 
578
                                              (self._remote_path(relpath), 
 
579
                                               self._optional_mode(mode)),
 
580
                                              from_file.read())
 
581
        if resp[0] == 'appended':
 
582
            return int(resp[1])
 
583
        self._translate_error(resp)
525
584
 
526
585
    def _translate_error(self, resp):
527
586
        """Raise an exception from a response"""
528
587
        what = resp[0]
529
 
        if what == 'enoent':
 
588
        if what == 'ok':
 
589
            return
 
590
        elif what == 'enoent':
530
591
            raise errors.NoSuchFile(resp[1])
 
592
        elif what == 'error':
 
593
            raise BzrProtocolError(unicode(resp[1]))
 
594
        elif what == 'FileExists':
 
595
            raise errors.FileExists(resp[1])
531
596
        else:
532
 
            raise BzrProtocolError('bad trailer on get: %r' % (resp,))
533
 
 
534
 
    def _recv_bulk(self):
535
 
        return self._client._recv_bulk()
 
597
            raise BzrProtocolError('unexpected smart server error: %r' % (resp,))
536
598
 
537
599
    def _send_tuple(self, args):
538
600
        self._client._send_tuple(args)
543
605
    def disconnect(self):
544
606
        self._client.disconnect()
545
607
 
546
 
    def append(self, relpath, from_file):
547
 
        raise errors.TransportNotPossible("writing to smart servers not supported yet")
548
 
 
549
608
    def delete(self, relpath):
550
609
        raise errors.TransportNotPossible('readonly transport')
551
610
 
552
611
    def delete_tree(self, relpath):
553
612
        raise errors.TransportNotPossible('readonly transport')
554
613
 
555
 
    def put(self, relpath, f, mode=None):
556
 
        raise errors.TransportNotPossible('readonly transport')
557
 
 
558
 
    def mkdir(self, relpath, mode=None):
559
 
        raise errors.TransportNotPossible('readonly transport')
560
 
 
561
614
    def rmdir(self, relpath):
562
615
        raise errors.TransportNotPossible('readonly transport')
563
616
 
584
637
        return BogusLock(relpath)
585
638
 
586
639
 
587
 
class SmartStreamClient(object):
 
640
class SmartStreamClient(SmartProtocolBase):
588
641
    """Connection to smart server over two streams"""
589
642
 
590
643
    def __init__(self, from_server, to_server):
591
 
        self._from_server = from_server
592
 
        self._to_server = to_server
 
644
        self._in = from_server
 
645
        self._out = to_server
593
646
 
594
647
    def __del__(self):
595
648
        self.disconnect()
596
649
 
597
650
    def _send_tuple(self, args):
598
 
        _send_tuple(self._to_server, args)
 
651
        _send_tuple(self._out, args)
599
652
 
600
653
    def disconnect(self):
601
654
        """Close connection to the server"""
602
 
        self._to_server.close()
603
 
        self._from_server.close()
604
 
 
605
 
    def _recv_bulk(self):
606
 
        return _recv_bulk(self._from_server)
607
 
 
608
 
    def _recv_tuple(self):
609
 
        return _recv_tuple(self._from_server)
 
655
        if getattr(self, '_out'):
 
656
            self._out.close()
 
657
        if getattr(self, '_in'):
 
658
            self._in.close()
 
659
 
 
660
    def _call(self, *args):
 
661
        self._send_tuple(args)
 
662
        return self._recv_tuple()
 
663
 
 
664
    def _call_with_upload(self, method, args, body):
 
665
        """Call an rpc, supplying bulk upload data.
 
666
 
 
667
        :param method: method name to call
 
668
        :param args: parameter args tuple
 
669
        :param body: upload body as a byte string
 
670
        """
 
671
        self._send_tuple((method,) + args)
 
672
        self._send_bulk_data(body)
 
673
        return self._recv_tuple()
610
674
 
611
675
    def query_version(self):
612
676
        """Return protocol version number of the server."""