3
 
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
 
4
 
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
 
5
 
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
 
7
 
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
 
10
 
# 2004-08-26 fl   unified http callback
 
11
 
# 2004-10-09 fl   factored out gzip_consumer support
 
12
 
# 2005-07-08 mbp  experimental support for keepalive connections
 
14
 
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
 
19
 
"""async/pipelined http client
 
24
 
Users of this library pass in URLs they want to see, and consumer
 
25
 
objects that will receive the results at some point in the future.
 
26
 
Any number of requests may be queued up, and more may be added while
 
27
 
the download is in progress.
 
29
 
Requests can be both superscalar and superpipelined.  That is to say,
 
30
 
for each server there can be multiple sockets open, and each socket
 
31
 
may have more than one request in flight.
 
36
 
There is a single DownloadManager, and a connection object for each
 
39
 
Request/consumer pairs are maintained in queues.  Each connection has
 
40
 
a list of transmitted requests whose response has not yet been
 
41
 
received.  There is also a per-server list of requests that have not
 
44
 
When a connection is ready to transmit a new request, it takes one
 
45
 
from the unsubmitted list, sends the request, and adds the request to
 
46
 
its unfulfilled list.  This should happen when the connection has
 
47
 
space for more transmissions or when a new request is added by the
 
48
 
user.  If the connection terminates with unfulfilled requests they are
 
49
 
put back onto the unsubmitted list, to be retried elsewhere.
 
51
 
Because responses come back precisely in order, the connection always
 
52
 
knows what it should expect next: the response for the next
 
56
 
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
 
57
 
# with a hostname does a remote DNS resolution, which is pretty sucky.
 
58
 
# Shouldn't there be a cache in glibc?  We should probably cache the
 
59
 
# address in, say, the DownloadManager.
 
61
 
# TODO: A default consumer operation that writes the received data
 
62
 
# into a file; by default the file is named the same as the last
 
63
 
# component of the URL.
 
65
 
# TODO: A utility function that is given a list of URLs, and downloads
 
66
 
# them all parallel/pipelined.  If any fail, it raises an exception
 
67
 
# (and discards the rest), or perhaps can be told to continue anyhow.
 
68
 
# The content is written into temporary files.  It returns a list of
 
69
 
# readable file objects.
 
71
 
# TODO: If we try pipelined or keepalive and the connection drop out
 
72
 
# then retry the request on a new connection; eventually we should perhaps
 
73
 
# learn that a given host or network just won't allow keepalive.
 
77
 
import socket, string, time, sys
 
79
 
import mimetools, urlparse, urllib
 
82
 
logging.basicConfig(level=logging.DEBUG,
 
83
 
                    format='%(asctime)s %(levelname)s %(message)s',
 
84
 
                    filename='/tmp/http_client.log',
 
87
 
logger = logging.getLogger('bzr.http_client')
 
94
 
# Close connection.   Request handlers can raise this exception to
 
95
 
# indicate that the connection should be closed.
 
97
 
class CloseConnection(Exception):
 
101
 
# Redirect connection.  Request handlers can raise this exception to
 
102
 
# indicate that the a new request should be issued.
 
104
 
class Redirect(CloseConnection):
 
105
 
    def __init__(self, location):
 
106
 
        self.location = location
 
109
 
class DownloadManager(object):
 
110
 
    """Handles pipelined/overlapped downloads.
 
112
 
    Pass in a series of URLs with handlers to receive the response.
 
113
 
    This object will spread the requests over however many sockets
 
117
 
        Requests not assigned to any channel
 
120
 
        Currently assigned to a channel
 
123
 
        self.queued_requests = []
 
124
 
        # self.channel = HttpChannel('localhost', 8000, self)
 
126
 
        self.try_pipelined = False
 
127
 
        self.try_keepalive = False
 
128
 
        self.max_channels = 5
 
131
 
    def enqueue(self, url, consumer):
 
132
 
        self.queued_requests.append((url, consumer))
 
133
 
        self._wake_up_channel()
 
136
 
    def _channel_closed(self, channel):
 
137
 
        """Called by the channel when its socket closes.
 
139
 
        self.channels.remove(channel)
 
140
 
        if self.queued_requests:
 
142
 
            self._wake_up_channel()
 
145
 
    def _make_channel(self):
 
146
 
        # proxy2 203.17.154.69
 
147
 
        # return HttpChannel('82.211.81.161', 80, self)         # bazaar-ng.org 
 
148
 
        # return HttpChannel('203.17.154.69', 8080, self)
 
149
 
        return HttpChannel('127.0.0.1', 8000, self)  # forwarded
 
152
 
    def _wake_up_channel(self):
 
153
 
        """Try to wake up one channel to send the newly-added request.
 
155
 
        There may be more than one request pending, and this may cause
 
156
 
        more than one channel to take requests.  That's OK; some of
 
157
 
        them may be frustrated.
 
159
 
        from random import shuffle, choice
 
161
 
        # first, wake up any idle channels
 
163
 
        for ch in self.channels:
 
164
 
            if not ch.sent_requests:
 
168
 
            debug("woke existing idle channel(s)")
 
171
 
        if len(self.channels) < self.max_channels:
 
172
 
            newch = self._make_channel()
 
173
 
            self.channels.append(newch)
 
175
 
            debug("created new channel")
 
178
 
        if self.try_pipelined:
 
179
 
            # ask existing channels to take it
 
180
 
            debug("woke busy channel")
 
181
 
            choice(self.channels).take_one()
 
184
 
        # debug("request postponed until a channel's idle")
 
190
 
        """Run until all outstanding requests have been served."""
 
191
 
        #while self.running_requests or self.queued_requests \
 
192
 
        #          or not self.channel.is_idle():
 
193
 
        #    asyncore.loop(count=1)
 
198
 
class Response(object):
 
199
 
    """Holds in-flight response."""
 
203
 
def _parse_response_http10(header):
 
204
 
    from cStringIO import StringIO
 
206
 
    fp = StringIO(header)
 
209
 
    r.status = fp.readline().split(" ", 2)
 
210
 
    r.headers = mimetools.Message(fp)
 
212
 
    # we can only(?) expect to do keepalive if we got either a 
 
213
 
    # content-length or chunked encoding; otherwise there's no way to know
 
214
 
    # when the content ends apart from through the connection close
 
215
 
    r.content_type = r.headers.get("content-type")
 
217
 
        r.content_length = int(r.headers.get("content-length"))
 
218
 
    except (ValueError, TypeError):
 
219
 
        r.content_length = None
 
220
 
    debug("seen content length of %r" % r.content_length)
 
222
 
    r.transfer_encoding = r.headers.get("transfer-encoding")
 
223
 
    r.content_encoding = r.headers.get("content-encoding")
 
224
 
    r.connection_reply = r.headers.get("connection")
 
226
 
    # TODO: pass status code to consumer?
 
228
 
    if r.transfer_encoding:
 
229
 
        raise NotImplementedError()
 
231
 
    if r.transfer_encoding:
 
232
 
        raise NotImplementedError()
 
234
 
    if int(r.status[1]) != 200:
 
235
 
        debug("can't handle response status %r" % r.status)
 
236
 
        raise NotImplementedError()
 
238
 
    if r.content_length is None:
 
239
 
        raise NotImplementedError()
 
241
 
    if r.content_length == 0:
 
242
 
        raise NotImplementedError()
 
244
 
    r.content_remaining = r.content_length                
 
254
 
class HttpChannel(asyncore.dispatcher_with_send):
 
255
 
    """One http socket, pipelining if possible."""
 
256
 
    # asynchronous http client
 
258
 
    user_agent = "http_client.py 1.3ka (based on effbot)"
 
260
 
    proxies = urllib.getproxies()
 
262
 
    def __init__(self, ip_host, ip_port, manager):
 
263
 
        asyncore.dispatcher_with_send.__init__(self)
 
264
 
        self.manager = manager
 
266
 
        # if a response header has been seen, this holds it
 
271
 
        self.chunk_size = None
 
273
 
        self.timestamp = time.time()
 
275
 
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 
276
 
        debug('connecting...')
 
277
 
        self.connect((ip_host, ip_port))
 
279
 
        # sent_requests holds (url, consumer) 
 
280
 
        self.sent_requests = []
 
286
 
        return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
 
290
 
        return (not self.sent_requests)
 
293
 
    def handle_connect(self):
 
299
 
        """Accept one request from the manager if possible."""
 
300
 
        if self.manager.try_pipelined:
 
301
 
            if len(self.sent_requests) > 4:
 
304
 
            if len(self.sent_requests) > 0:
 
308
 
            url, consumer = self.manager.queued_requests.pop(0)
 
309
 
            debug('request accepted by channel')
 
313
 
        # TODO: If there are too many already in flight, don't take one.
 
314
 
        # TODO: If the socket's not writable (tx buffer full), don't take.
 
315
 
        self._push_request_http10(url, consumer)
 
319
 
    def _push_request_http10(self, url, consumer):
 
320
 
        """Send a request, and add it to the outstanding queue."""
 
321
 
        # TODO: check the url requested is appropriate for this connection
 
323
 
        # TODO: If there are too many requests outstanding or (less likely) the 
 
324
 
        # connection fails, queue it for later use.
 
326
 
        # TODO: Keep track of requests that have been sent but not yet fulfilled,
 
327
 
        # because we might need to retransmit them if the connection fails. (Or
 
328
 
        # should the caller do that?)
 
330
 
        request = self._form_request_http10(url)
 
331
 
        debug('send request for %s from %r' % (url, self))
 
333
 
        # dispatcher_with_send handles buffering the data until it can
 
334
 
        # be written, and hooks handle_write.
 
338
 
        self.sent_requests.append((url, consumer))
 
341
 
    def _form_request_http10(self, url):
 
342
 
        # TODO: get right vhost name
 
344
 
            "GET %s HTTP/1.0" % (url),
 
345
 
            "Host: www.bazaar-ng.org",
 
348
 
        if self.manager.try_keepalive or self.manager.try_pipelined:
 
351
 
                "Connection: keep-alive",
 
354
 
        # make sure to include a user agent
 
355
 
        for header in request:
 
356
 
            if string.lower(header).startswith("user-agent:"):
 
359
 
            request.append("User-Agent: %s" % self.user_agent)
 
361
 
        return string.join(request, "\r\n") + "\r\n\r\n"
 
364
 
    def handle_read(self):
 
365
 
        # handle incoming data
 
366
 
        data = self.recv(2048)
 
368
 
        self.data = self.data + data
 
371
 
            debug('got %d bytes from socket' % len(data))
 
373
 
            debug('server closed connection')
 
376
 
            consumer = self.sent_requests[0][1]
 
377
 
            if not self.response:
 
378
 
                # do not have a full response header yet
 
380
 
                # check if we've seen a full header
 
381
 
                debug('getting header for %s' % self.sent_requests[0][0])
 
383
 
                header = self.data.split("\r\n\r\n", 1)
 
386
 
                header, self.data = header
 
388
 
                self.response = _parse_response_http10(header)
 
389
 
                self.content_remaining = self.response.content_length
 
394
 
            # we now know how many (more) content bytes we have, and how much
 
395
 
            # is in the data buffer. there are two main possibilities:
 
396
 
            # too much data, and some must be left behind containing the next
 
397
 
            # response headers, or too little, or possibly just right
 
399
 
            want = self.content_remaining
 
401
 
                got_data = self.data[:want]
 
402
 
                self.data = self.data[want:]
 
406
 
                self.content_remaining -= len(got_data)
 
408
 
                debug('pass back %d bytes of %s, %d remain'
 
410
 
                         self.sent_requests[0][0],
 
411
 
                         self.content_remaining))
 
414
 
            if self.content_remaining == 0:
 
415
 
                del self.sent_requests[0]
 
417
 
                debug('content complete')
 
418
 
                consumer.content_complete()
 
420
 
                # reset lots of things and try to get the next response header
 
421
 
                if self.response.connection_reply == 'close':
 
422
 
                    debug('server requested close')
 
423
 
                    self.manager._channel_closed(self)
 
425
 
                elif not self.manager.try_keepalive:
 
426
 
                    debug('no keepalive for this socket')
 
427
 
                    self.manager._channel_closed(self)
 
430
 
                    debug("ready for next header...")
 
436
 
    def handle_close(self):
 
437
 
        debug('async told us of close on %r' % self)
 
438
 
        # if there are outstanding requests should probably reopen and 
 
439
 
        # retransmit, but if we're not making any progress then give up
 
440
 
        self.manager._channel_closed(self)
 
445
 
    def __init__(self, url, pb):
 
450
 
    def feed(self, data):
 
451
 
        # print "feed", repr(data)
 
452
 
        # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
 
454
 
            base = self.url[self.url.rindex('/')+1:]
 
455
 
            self.outf = file('/tmp/download/' + base, 'wb')
 
456
 
        self.outf.write(data)
 
458
 
    def error(self, err_info):
 
460
 
        error('error reported to consumer')
 
461
 
        traceback.print_exception(err_info[0], err_info[1], err_info[2])
 
464
 
    def content_complete(self):
 
465
 
        info('content complete from %s' % self.url)
 
468
 
        # using last_cnt is cheating
 
469
 
        self._pb.update('downloading inventory',
 
475
 
if __name__ == "__main__":
 
476
 
    logging.basicConfig(level=logging.DEBUG)
 
478
 
    mgr = DownloadManager()
 
480
 
    from bzrlib.branch import Branch
 
481
 
    from bzrlib.progress import ProgressBar
 
484
 
    revs = Branch('/home/mbp/work/bzr').revision_history()
 
485
 
    pb.update('downloading inventories', 0, len(revs))
 
488
 
        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
 
490
 
        mgr.enqueue(url, DummyConsumer(url, pb))
 
497
 
#     for url in ['http://www.bazaar-ng.org/',
 
498
 
#                 'http://www.bazaar-ng.org/tutorial.html',
 
499
 
#                 'http://www.bazaar-ng.org/download.html',
 
500
 
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',