/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Martin Pool
  • Date: 2009-03-13 07:54:48 UTC
  • mfrom: (4144 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4189.
  • Revision ID: mbp@sourcefrog.net-20090313075448-jlz1t7baz7gzipqn
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
    osutils,
41
41
    symbol_versioning,
42
42
    trace,
 
43
    ui,
43
44
    urlutils,
44
45
    )
45
46
from bzrlib.smart import client, protocol
87
88
 
88
89
def _get_line(read_bytes_func):
89
90
    """Read bytes using read_bytes_func until a newline byte.
90
 
    
 
91
 
91
92
    This isn't particularly efficient, so should only be used when the
92
93
    expected size of the line is quite short.
93
 
    
 
94
 
94
95
    :returns: a tuple of two strs: (line, excess)
95
96
    """
96
97
    newline_pos = -1
112
113
 
113
114
    def __init__(self):
114
115
        self._push_back_buffer = None
115
 
        
 
116
 
116
117
    def _push_back(self, bytes):
117
118
        """Return unused bytes to the medium, because they belong to the next
118
119
        request(s).
152
153
 
153
154
    def _get_line(self):
154
155
        """Read bytes from this request's response until a newline byte.
155
 
        
 
156
 
156
157
        This isn't particularly efficient, so should only be used when the
157
158
        expected size of the line is quite short.
158
159
 
161
162
        line, excess = _get_line(self.read_bytes)
162
163
        self._push_back(excess)
163
164
        return line
164
 
 
 
165
 
 
166
    def _report_activity(self, bytes, direction):
 
167
        """Notify that this medium has activity.
 
168
 
 
169
        Implementations should call this from all methods that actually do IO.
 
170
        Be careful that it's not called twice, if one method is implemented on
 
171
        top of another.
 
172
 
 
173
        :param bytes: Number of bytes read or written.
 
174
        :param direction: 'read' or 'write' or None.
 
175
        """
 
176
        ui.ui_factory.report_transport_activity(self, bytes, direction)
 
177
 
165
178
 
166
179
class SmartServerStreamMedium(SmartMedium):
167
180
    """Handles smart commands coming over a stream.
172
185
    One instance is created for each connected client; it can serve multiple
173
186
    requests in the lifetime of the connection.
174
187
 
175
 
    The server passes requests through to an underlying backing transport, 
 
188
    The server passes requests through to an underlying backing transport,
176
189
    which will typically be a LocalTransport looking at the server's filesystem.
177
190
 
178
191
    :ivar _push_back_buffer: a str of bytes that have been read from the stream
223
236
 
224
237
    def _serve_one_request(self, protocol):
225
238
        """Read one request from input, process, send back a response.
226
 
        
 
239
 
227
240
        :param protocol: a SmartServerRequestProtocol.
228
241
        """
229
242
        try:
268
281
                self.finished = True
269
282
                return
270
283
            protocol.accept_bytes(bytes)
271
 
        
 
284
 
272
285
        self._push_back(protocol.unused_data)
273
286
 
274
287
    def _read_bytes(self, desired_count):
275
288
        # We ignore the desired_count because on sockets it's more efficient to
276
289
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
 
        return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
 
290
        bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
 
291
        self._report_activity(len(bytes), 'read')
 
292
        return bytes
278
293
 
279
294
    def terminate_due_to_error(self):
280
295
        # TODO: This should log to a server log file, but no such thing
283
298
        self.finished = True
284
299
 
285
300
    def _write_out(self, bytes):
286
 
        osutils.send_all(self.socket, bytes)
 
301
        osutils.send_all(self.socket, bytes, self._report_activity)
287
302
 
288
303
 
289
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
350
365
    request.finished_reading()
351
366
 
352
367
    It is up to the individual SmartClientMedium whether multiple concurrent
353
 
    requests can exist. See SmartClientMedium.get_request to obtain instances 
354
 
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
368
    requests can exist. See SmartClientMedium.get_request to obtain instances
 
369
    of SmartClientMediumRequest, and the concrete Medium you are using for
355
370
    details on concurrency and pipelining.
356
371
    """
357
372
 
403
418
    def _finished_reading(self):
404
419
        """Helper for finished_reading.
405
420
 
406
 
        finished_reading checks the state of the request to determine if 
 
421
        finished_reading checks the state of the request to determine if
407
422
        finished_reading is allowed, and if it is hands off to _finished_reading
408
423
        to perform the action.
409
424
        """
423
438
    def _finished_writing(self):
424
439
        """Helper for finished_writing.
425
440
 
426
 
        finished_writing checks the state of the request to determine if 
 
441
        finished_writing checks the state of the request to determine if
427
442
        finished_writing is allowed, and if it is hands off to _finished_writing
428
443
        to perform the action.
429
444
        """
449
464
        read_bytes checks the state of the request to determing if bytes
450
465
        should be read. After that it hands off to _read_bytes to do the
451
466
        actual read.
452
 
        
 
467
 
453
468
        By default this forwards to self._medium.read_bytes because we are
454
469
        operating on the medium's stream.
455
470
        """
460
475
        if not line.endswith('\n'):
461
476
            # end of file encountered reading from server
462
477
            raise errors.ConnectionReset(
463
 
                "please check connectivity and permissions",
464
 
                "(and try -Dhpss if further diagnosis is required)")
 
478
                "please check connectivity and permissions")
465
479
        return line
466
480
 
467
481
    def _read_line(self):
468
482
        """Helper for SmartClientMediumRequest.read_line.
469
 
        
 
483
 
470
484
        By default this forwards to self._medium._get_line because we are
471
485
        operating on the medium's stream.
472
486
        """
516
530
        value[0] = 0
517
531
        if count != 0:
518
532
            trace.note('HPSS calls: %d %s', count, medium_repr)
519
 
        
 
533
 
520
534
    def flush_all(self):
521
535
        for ref in list(self.counts.keys()):
522
536
            self.done(ref)
523
537
 
524
538
_debug_counter = None
525
 
  
526
 
  
 
539
 
 
540
 
527
541
class SmartClientMedium(SmartMedium):
528
542
    """Smart client is a medium for sending smart protocol requests over."""
529
543
 
574
588
        """
575
589
        if (self._remote_version_is_before is not None and
576
590
            version_tuple > self._remote_version_is_before):
 
591
            # We have been told that the remote side is older than some version
 
592
            # which is newer than a previously supplied older-than version.
 
593
            # This indicates that some smart verb call is not guarded
 
594
            # appropriately (it should simply not have been tried).
577
595
            raise AssertionError(
578
596
                "_remember_remote_is_before(%r) called, but "
579
597
                "_remember_remote_is_before(%r) was called previously."
617
635
 
618
636
    def disconnect(self):
619
637
        """If this medium maintains a persistent connection, close it.
620
 
        
 
638
 
621
639
        The default implementation does nothing.
622
640
        """
623
 
        
 
641
 
624
642
    def remote_path_from_transport(self, transport):
625
643
        """Convert transport into a path suitable for using in a request.
626
 
        
 
644
 
627
645
        Note that the resulting remote path doesn't encode the host name or
628
646
        anything but path, so it is only safe to use it in requests sent over
629
647
        the medium from the matching transport.
657
675
 
658
676
    def _flush(self):
659
677
        """Flush the output stream.
660
 
        
 
678
 
661
679
        This method is used by the SmartClientStreamMediumRequest to ensure that
662
680
        all data for a request is sent, to avoid long timeouts or deadlocks.
663
681
        """
674
692
 
675
693
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
676
694
    """A client medium using simple pipes.
677
 
    
 
695
 
678
696
    This client does not manage the pipes: it assumes they will always be open.
679
697
    """
680
698
 
686
704
    def _accept_bytes(self, bytes):
687
705
        """See SmartClientStreamMedium.accept_bytes."""
688
706
        self._writeable_pipe.write(bytes)
 
707
        self._report_activity(len(bytes), 'write')
689
708
 
690
709
    def _flush(self):
691
710
        """See SmartClientStreamMedium._flush()."""
693
712
 
694
713
    def _read_bytes(self, count):
695
714
        """See SmartClientStreamMedium._read_bytes."""
696
 
        return self._readable_pipe.read(count)
 
715
        bytes = self._readable_pipe.read(count)
 
716
        self._report_activity(len(bytes), 'read')
 
717
        return bytes
697
718
 
698
719
 
699
720
class SmartSSHClientMedium(SmartClientStreamMedium):
700
721
    """A client medium using SSH."""
701
 
    
 
722
 
702
723
    def __init__(self, host, port=None, username=None, password=None,
703
724
            base=None, vendor=None, bzr_remote_path=None):
704
725
        """Creates a client that will connect on the first use.
705
 
        
 
726
 
706
727
        :param vendor: An optional override for the ssh vendor to use. See
707
728
            bzrlib.transport.ssh for details on ssh vendors.
708
729
        """
709
 
        SmartClientStreamMedium.__init__(self, base)
710
730
        self._connected = False
711
731
        self._host = host
712
732
        self._password = password
713
733
        self._port = port
714
734
        self._username = username
 
735
        # SmartClientStreamMedium stores the repr of this object in its
 
736
        # _DebugCounter so we have to store all the values used in our repr
 
737
        # method before calling the super init.
 
738
        SmartClientStreamMedium.__init__(self, base)
715
739
        self._read_from = None
716
740
        self._ssh_connection = None
717
741
        self._vendor = vendor
718
742
        self._write_to = None
719
743
        self._bzr_remote_path = bzr_remote_path
720
 
        if self._bzr_remote_path is None:
721
 
            symbol_versioning.warn(
722
 
                'bzr_remote_path is required as of bzr 0.92',
723
 
                DeprecationWarning, stacklevel=2)
724
 
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
744
        # for the benefit of progress making a short description of this
 
745
        # transport
 
746
        self._scheme = 'bzr+ssh'
 
747
 
 
748
    def __repr__(self):
 
749
        return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
 
750
            self.__class__.__name__,
 
751
            self._connected,
 
752
            self._username,
 
753
            self._host,
 
754
            self._port)
725
755
 
726
756
    def _accept_bytes(self, bytes):
727
757
        """See SmartClientStreamMedium.accept_bytes."""
728
758
        self._ensure_connection()
729
759
        self._write_to.write(bytes)
 
760
        self._report_activity(len(bytes), 'write')
730
761
 
731
762
    def disconnect(self):
732
763
        """See SmartClientMedium.disconnect()."""
762
793
        if not self._connected:
763
794
            raise errors.MediumNotConnected(self)
764
795
        bytes_to_read = min(count, _MAX_READ_SIZE)
765
 
        return self._read_from.read(bytes_to_read)
 
796
        bytes = self._read_from.read(bytes_to_read)
 
797
        self._report_activity(len(bytes), 'read')
 
798
        return bytes
766
799
 
767
800
 
768
801
# Port 4155 is the default port for bzr://, registered with IANA.
772
805
 
773
806
class SmartTCPClientMedium(SmartClientStreamMedium):
774
807
    """A client medium using TCP."""
775
 
    
 
808
 
776
809
    def __init__(self, host, port, base):
777
810
        """Creates a client that will connect on the first use."""
778
811
        SmartClientStreamMedium.__init__(self, base)
784
817
    def _accept_bytes(self, bytes):
785
818
        """See SmartClientMedium.accept_bytes."""
786
819
        self._ensure_connection()
787
 
        osutils.send_all(self._socket, bytes)
 
820
        osutils.send_all(self._socket, bytes, self._report_activity)
788
821
 
789
822
    def disconnect(self):
790
823
        """See SmartClientMedium.disconnect()."""
803
836
        else:
804
837
            port = int(self._port)
805
838
        try:
806
 
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC, 
 
839
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
807
840
                socket.SOCK_STREAM, 0, 0)
808
841
        except socket.gaierror, (err_num, err_msg):
809
842
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
813
846
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
814
847
            try:
815
848
                self._socket = socket.socket(family, socktype, proto)
816
 
                self._socket.setsockopt(socket.IPPROTO_TCP, 
 
849
                self._socket.setsockopt(socket.IPPROTO_TCP,
817
850
                                        socket.TCP_NODELAY, 1)
818
851
                self._socket.connect(sockaddr)
819
852
            except socket.error, err:
835
868
 
836
869
    def _flush(self):
837
870
        """See SmartClientStreamMedium._flush().
838
 
        
839
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
871
 
 
872
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
840
873
        add a means to do a flush, but that can be done in the future.
841
874
        """
842
875
 
847
880
        # We ignore the desired_count because on sockets it's more efficient to
848
881
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
849
882
        try:
850
 
            return self._socket.recv(_MAX_READ_SIZE)
 
883
            bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
851
884
        except socket.error, e:
852
885
            if len(e.args) and e.args[0] == errno.ECONNRESET:
853
886
                # Callers expect an empty string in that case
854
887
                return ''
855
888
            else:
856
889
                raise
 
890
        else:
 
891
            self._report_activity(len(bytes), 'read')
 
892
            return bytes
857
893
 
858
894
 
859
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
872
908
 
873
909
    def _accept_bytes(self, bytes):
874
910
        """See SmartClientMediumRequest._accept_bytes.
875
 
        
 
911
 
876
912
        This forwards to self._medium._accept_bytes because we are operating
877
913
        on the mediums stream.
878
914
        """
881
917
    def _finished_reading(self):
882
918
        """See SmartClientMediumRequest._finished_reading.
883
919
 
884
 
        This clears the _current_request on self._medium to allow a new 
 
920
        This clears the _current_request on self._medium to allow a new
885
921
        request to be created.
886
922
        """
887
923
        if self._medium._current_request is not self:
888
924
            raise AssertionError()
889
925
        self._medium._current_request = None
890
 
        
 
926
 
891
927
    def _finished_writing(self):
892
928
        """See SmartClientMediumRequest._finished_writing.
893
929