3
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
 
 
4
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
 
 
5
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
 
 
7
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
 
 
10
# 2004-08-26 fl   unified http callback
 
 
11
# 2004-10-09 fl   factored out gzip_consumer support
 
 
12
# 2005-07-08 mbp  experimental support for keepalive connections
 
 
14
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
 
 
19
"""async/pipelined http client
 
 
24
Users of this library pass in URLs they want to see, and consumer
 
 
25
objects that will receive the results at some point in the future.
 
 
26
Any number of requests may be queued up, and more may be added while
 
 
27
the download is in progress.
 
 
29
Requests can be both superscalar and superpipelined.  That is to say,
 
 
30
for each server there can be multiple sockets open, and each socket
 
 
31
may have more than one request in flight.
 
 
36
There is a single DownloadManager, and a connection object for each
 
 
39
Request/consumer pairs are maintained in queues.  Each connection has
 
 
40
a list of transmitted requests whose response has not yet been
 
 
41
received.  There is also a per-server list of requests that have not
 
 
44
When a connection is ready to transmit a new request, it takes one
 
 
45
from the unsubmitted list, sends the request, and adds the request to
 
 
46
its unfulfilled list.  This should happen when the connection has
 
 
47
space for more transmissions or when a new request is added by the
 
 
48
user.  If the connection terminates with unfulfilled requests they are
 
 
49
put back onto the unsubmitted list, to be retried elsewhere.
 
 
51
Because responses come back precisely in order, the connection always
 
 
52
knows what it should expect next: the response for the next
 
 
56
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
 
 
57
# with a hostname does a remote DNS resolution, which is pretty sucky.
 
 
58
# Shouldn't there be a cache in glibc?  We should probably cache the
 
 
59
# address in, say, the DownloadManager.
 
 
61
# TODO: A default consumer operation that writes the received data
 
 
62
# into a file; by default the file is named the same as the last
 
 
63
# component of the URL.
 
 
65
# TODO: A utility function that is given a list of URLs, and downloads
 
 
66
# them all parallel/pipelined.  If any fail, it raises an exception
 
 
67
# (and discards the rest), or perhaps can be told to continue anyhow.
 
 
68
# The content is written into temporary files.  It returns a list of
 
 
69
# readable file objects.
 
 
72
import socket, string, time, sys
 
 
74
import mimetools, urlparse, urllib
 
 
77
logger = logging.getLogger('bzr.http_client')
 
 
84
# Close connection.   Request handlers can raise this exception to
 
 
85
# indicate that the connection should be closed.
 
 
87
class CloseConnection(Exception):
 
 
91
# Redirect connection.  Request handlers can raise this exception to
 
 
92
# indicate that the a new request should be issued.
 
 
94
class Redirect(CloseConnection):
 
 
95
    def __init__(self, location):
 
 
96
        self.location = location
 
 
99
class DownloadManager(object):
 
 
100
    """Handles pipelined/overlapped downloads.
 
 
102
    Pass in a series of URLs with handlers to receive the response.
 
 
103
    This object will spread the requests over however many sockets
 
 
107
        Requests not assigned to any channel
 
 
110
        Currently assigned to a channel
 
 
113
        self.queued_requests = []
 
 
114
        # self.channel = HttpChannel('localhost', 8000, self)
 
 
116
        self.try_pipelined = False
 
 
117
        self.try_keepalive = False
 
 
118
        self.max_channels = 3
 
 
121
    def enqueue(self, url, consumer):
 
 
122
        self.queued_requests.append((url, consumer))
 
 
123
        self._wake_up_channel()
 
 
126
    def _channel_closed(self, channel):
 
 
127
        """Called by the channel when its socket closes.
 
 
129
        self.channels.remove(channel)
 
 
130
        if self.queued_requests:
 
 
132
            self._wake_up_channel()
 
 
135
    def _make_channel(self):
 
 
136
        # proxy2 203.17.154.69
 
 
138
        return HttpChannel('82.211.81.161', 80, self)
 
 
139
        # return HttpChannel('203.17.154.69', 8080, self)
 
 
140
        # return HttpChannel('localhost', 8000, self)
 
 
143
    def _wake_up_channel(self):
 
 
144
        """Try to wake up one channel to send the newly-added request.
 
 
146
        There may be more than one request pending, and this may cause
 
 
147
        more than one channel to take requests.  That's OK; some of
 
 
148
        them may be frustrated.
 
 
150
        from random import shuffle, choice
 
 
152
        # first, wake up any idle channels
 
 
154
        for ch in self.channels:
 
 
155
            if not ch.sent_requests:
 
 
159
            debug("woke existing idle channel(s)")
 
 
162
        if len(self.channels) < self.max_channels:
 
 
163
            newch = self._make_channel()
 
 
164
            self.channels.append(newch)
 
 
166
            debug("created new channel")
 
 
169
        if self.try_pipelined:
 
 
170
            # ask existing channels to take it
 
 
171
            debug("woke busy channel")
 
 
172
            choice(self.channels).take_one()
 
 
175
        debug("request left until a channel's idle")
 
 
181
        """Run until all outstanding requests have been served."""
 
 
182
        #while self.running_requests or self.queued_requests \
 
 
183
        #          or not self.channel.is_idle():
 
 
184
        #    asyncore.loop(count=1)
 
 
189
class Response(object):
 
 
190
    """Holds in-flight response."""
 
 
194
def _parse_response_http10(header):
 
 
195
    from cStringIO import StringIO
 
 
197
    fp = StringIO(header)
 
 
200
    r.status = fp.readline().split(" ", 2)
 
 
201
    r.headers = mimetools.Message(fp)
 
 
203
    # we can only(?) expect to do keepalive if we got either a 
 
 
204
    # content-length or chunked encoding; otherwise there's no way to know
 
 
205
    # when the content ends apart from through the connection close
 
 
206
    r.content_type = r.headers.get("content-type")
 
 
208
        r.content_length = int(r.headers.get("content-length"))
 
 
209
    except (ValueError, TypeError):
 
 
210
        r.content_length = None
 
 
211
    debug("seen content length of %r" % r.content_length)
 
 
213
    r.transfer_encoding = r.headers.get("transfer-encoding")
 
 
214
    r.content_encoding = r.headers.get("content-encoding")
 
 
215
    r.connection_reply = r.headers.get("connection")
 
 
217
    # TODO: pass status code to consumer?
 
 
219
    if r.transfer_encoding:
 
 
220
        raise NotImplementedError()
 
 
222
    if r.transfer_encoding:
 
 
223
        raise NotImplementedError()
 
 
225
    if int(r.status[1]) != 200:
 
 
226
        debug("can't handle response status %r" % r.status)
 
 
227
        raise NotImplementedError()
 
 
229
    if r.content_length == None:
 
 
230
        raise NotImplementedError()
 
 
232
    if r.content_length == 0:
 
 
233
        raise NotImplementedError()
 
 
235
    r.content_remaining = r.content_length                
 
 
245
class HttpChannel(asyncore.dispatcher_with_send):
 
 
246
    """One http socket, pipelining if possible."""
 
 
247
    # asynchronous http client
 
 
249
    user_agent = "http_client.py 1.3ka (based on effbot)"
 
 
251
    proxies = urllib.getproxies()
 
 
253
    def __init__(self, ip_host, ip_port, manager):
 
 
254
        asyncore.dispatcher_with_send.__init__(self)
 
 
255
        self.manager = manager
 
 
257
        # if a response header has been seen, this holds it
 
 
262
        self.chunk_size = None
 
 
264
        self.timestamp = time.time()
 
 
266
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 
 
267
        debug('connecting...')
 
 
268
        self.connect((ip_host, ip_port))
 
 
270
        # sent_requests holds (url, consumer) 
 
 
271
        self.sent_requests = []
 
 
277
        return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
 
 
281
        return (not self.sent_requests)
 
 
284
    def handle_connect(self):
 
 
290
        """Accept one request from the manager if possible."""
 
 
291
        if self.manager.try_pipelined:
 
 
292
            if len(self.sent_requests) > 4:
 
 
295
            if len(self.sent_requests) > 0:
 
 
299
            url, consumer = self.manager.queued_requests.pop(0)
 
 
300
            debug('request accepted by channel')
 
 
304
        # TODO: If there are too many already in flight, don't take one.
 
 
305
        # TODO: If the socket's not writable (tx buffer full), don't take.
 
 
306
        self._push_request_http10(url, consumer)
 
 
310
    def _push_request_http10(self, url, consumer):
 
 
311
        """Send a request, and add it to the outstanding queue."""
 
 
312
        # TODO: check the url requested is appropriate for this connection
 
 
314
        # TODO: If there are too many requests outstanding or (less likely) the 
 
 
315
        # connection fails, queue it for later use.
 
 
317
        # TODO: Keep track of requests that have been sent but not yet fulfilled,
 
 
318
        # because we might need to retransmit them if the connection fails. (Or
 
 
319
        # should the caller do that?)
 
 
321
        request = self._form_request_http10(url)
 
 
322
        debug('send request for %s from %r' % (url, self))
 
 
324
        # dispatcher_with_send handles buffering the data until it can
 
 
325
        # be written, and hooks handle_write.
 
 
329
        self.sent_requests.append((url, consumer))
 
 
332
    def _form_request_http10(self, url):
 
 
333
        # TODO: get right vhost name
 
 
335
            "GET %s HTTP/1.0" % (url),
 
 
336
            "Host: www.bazaar-ng.org",
 
 
339
        if self.manager.try_keepalive or self.manager.try_pipelined:
 
 
342
                "Connection: keep-alive",
 
 
345
        # make sure to include a user agent
 
 
346
        for header in request:
 
 
347
            if string.lower(header).startswith("user-agent:"):
 
 
350
            request.append("User-Agent: %s" % self.user_agent)
 
 
352
        return string.join(request, "\r\n") + "\r\n\r\n"
 
 
355
    def handle_read(self):
 
 
356
        # handle incoming data
 
 
357
        data = self.recv(2048)
 
 
359
        self.data = self.data + data
 
 
362
            debug('got %d bytes from socket' % len(data))
 
 
364
            debug('server closed connection')
 
 
367
            consumer = self.sent_requests[0][1]
 
 
368
            if not self.response:
 
 
369
                # do not have a full response header yet
 
 
371
                # check if we've seen a full header
 
 
372
                debug('getting header for %s' % self.sent_requests[0][0])
 
 
374
                header = self.data.split("\r\n\r\n", 1)
 
 
377
                header, self.data = header
 
 
379
                self.response = _parse_response_http10(header)
 
 
380
                self.content_remaining = self.response.content_length
 
 
385
            # we now know how many (more) content bytes we have, and how much
 
 
386
            # is in the data buffer. there are two main possibilities:
 
 
387
            # too much data, and some must be left behind containing the next
 
 
388
            # response headers, or too little, or possibly just right
 
 
390
            want = self.content_remaining
 
 
392
                got_data = self.data[:want]
 
 
393
                self.data = self.data[want:]
 
 
397
                debug('pass back %d bytes of %s' % (len(got_data),
 
 
398
                                                    self.sent_requests[0][0]))
 
 
401
                self.content_remaining -= len(got_data)
 
 
403
            if self.content_remaining == 0:
 
 
404
                del self.sent_requests[0]
 
 
406
                # reset lots of things and try to get the next response header
 
 
407
                if self.response.connection_reply == 'close':
 
 
408
                    debug('server requested close')
 
 
409
                    self.manager._channel_closed(self)
 
 
411
                elif not self.manager.try_keepalive:
 
 
412
                    debug('no keepalive for this socket')
 
 
413
                    self.manager._channel_closed(self)
 
 
416
                    debug("ready for next header...")
 
 
417
                    consumer.content_complete()
 
 
423
    def handle_close(self):
 
 
424
        debug('async told us of close on %r' % self)
 
 
425
        # if there are outstanding requests should probably reopen and 
 
 
426
        # retransmit, but if we're not making any progress then give up
 
 
427
        self.manager._channel_closed(self)
 
 
431
if __name__ == "__main__":
 
 
432
    class dummy_consumer:
 
 
433
        def __init__(self, url):
 
 
436
        def feed(self, data):
 
 
437
            # print "feed", repr(data)
 
 
438
            # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
 
 
441
        def error(self, err_info):
 
 
443
            traceback.print_exception(err_info[0], err_info[1], err_info[2])
 
 
445
        def content_complete(self):
 
 
446
            debug('content complete from %s' % self.url)
 
 
449
    logging.basicConfig(level=logging.DEBUG)
 
 
451
    mgr = DownloadManager()
 
 
453
    from bzrlib.branch import Branch
 
 
454
    revs = Branch('/home/mbp/work/bzr').revision_history()
 
 
458
#     for url in ['http://www.bazaar-ng.org/',
 
 
459
#                 'http://www.bazaar-ng.org/tutorial.html',
 
 
460
#                 'http://www.bazaar-ng.org/download.html',
 
 
461
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
 
 
465
#        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/' \
 
 
466
        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
 
 
468
        mgr.enqueue(url, dummy_consumer(url))