bzr branch
http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
| 
1185.1.29
by Robert Collins
 merge merge tweaks from aaron, which includes latest .dev  | 
1  | 
#! /usr/bin/python
 | 
2  | 
||
3  | 
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
 | 
|
4  | 
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
 | 
|
5  | 
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
 | 
|
6  | 
#
 | 
|
7  | 
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
 | 
|
8  | 
#
 | 
|
9  | 
# changes:
 | 
|
10  | 
# 2004-08-26 fl   unified http callback
 | 
|
11  | 
# 2004-10-09 fl   factored out gzip_consumer support
 | 
|
12  | 
# 2005-07-08 mbp  experimental support for keepalive connections
 | 
|
13  | 
#
 | 
|
14  | 
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
 | 
|
15  | 
#
 | 
|
16  | 
||
17  | 
||
18  | 
||
19  | 
"""async/pipelined http client
 | 
|
20  | 
||
21  | 
Use
 | 
|
22  | 
===
 | 
|
23  | 
||
24  | 
Users of this library pass in URLs they want to see, and consumer
 | 
|
25  | 
objects that will receive the results at some point in the future.
 | 
|
26  | 
Any number of requests may be queued up, and more may be added while
 | 
|
27  | 
the download is in progress.
 | 
|
28  | 
||
29  | 
Requests can be both superscalar and superpipelined.  That is to say,
 | 
|
30  | 
for each server there can be multiple sockets open, and each socket
 | 
|
31  | 
may have more than one request in flight.
 | 
|
32  | 
||
33  | 
Design
 | 
|
34  | 
======
 | 
|
35  | 
||
36  | 
There is a single DownloadManager, and a connection object for each
 | 
|
37  | 
open socket.
 | 
|
38  | 
||
39  | 
Request/consumer pairs are maintained in queues.  Each connection has
 | 
|
40  | 
a list of transmitted requests whose response has not yet been
 | 
|
41  | 
received.  There is also a per-server list of requests that have not
 | 
|
42  | 
yet been submitted.
 | 
|
43  | 
||
44  | 
When a connection is ready to transmit a new request, it takes one
 | 
|
45  | 
from the unsubmitted list, sends the request, and adds the request to
 | 
|
46  | 
its unfulfilled list.  This should happen when the connection has
 | 
|
47  | 
space for more transmissions or when a new request is added by the
 | 
|
48  | 
user.  If the connection terminates with unfulfilled requests they are
 | 
|
49  | 
put back onto the unsubmitted list, to be retried elsewhere.
 | 
|
50  | 
||
51  | 
Because responses come back precisely in order, the connection always
 | 
|
52  | 
knows what it should expect next: the response for the next
 | 
|
53  | 
unfulfilled request.
 | 
|
54  | 
"""
 | 
|
55  | 
||
56  | 
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
 | 
|
57  | 
# with a hostname does a remote DNS resolution, which is pretty sucky.
 | 
|
58  | 
# Shouldn't there be a cache in glibc?  We should probably cache the
 | 
|
59  | 
# address in, say, the DownloadManager.
 | 
|
60  | 
||
61  | 
# TODO: A default consumer operation that writes the received data
 | 
|
62  | 
# into a file; by default the file is named the same as the last
 | 
|
63  | 
# component of the URL.
 | 
|
64  | 
||
65  | 
# TODO: A utility function that is given a list of URLs, and downloads
 | 
|
66  | 
# them all parallel/pipelined.  If any fail, it raises an exception
 | 
|
67  | 
# (and discards the rest), or perhaps can be told to continue anyhow.
 | 
|
68  | 
# The content is written into temporary files.  It returns a list of
 | 
|
69  | 
# readable file objects.
 | 
|
70  | 
||
71  | 
# TODO: If we try pipelined or keepalive and the connection drop out
 | 
|
72  | 
# then retry the request on a new connection; eventually we should perhaps
 | 
|
73  | 
# learn that a given host or network just won't allow keepalive.
 | 
|
74  | 
||
75  | 
||
76  | 
import asyncore  | 
|
77  | 
import socket, string, time, sys  | 
|
78  | 
import StringIO  | 
|
79  | 
import mimetools, urlparse, urllib  | 
|
80  | 
import logging  | 
|
81  | 
||
82  | 
logging.basicConfig(level=logging.DEBUG,  | 
|
83  | 
format='%(asctime)s %(levelname)s %(message)s',  | 
|
84  | 
filename='/tmp/http_client.log',  | 
|
85  | 
filemode='w')  | 
|
86  | 
||
87  | 
logger = logging.getLogger('bzr.http_client')  | 
|
88  | 
debug = logger.debug  | 
|
89  | 
info = logger.info  | 
|
90  | 
error = logger.error  | 
|
91  | 
||
92  | 
||
93  | 
##
 | 
|
94  | 
# Close connection.   Request handlers can raise this exception to
 | 
|
95  | 
# indicate that the connection should be closed.
 | 
|
96  | 
||
97  | 
class CloseConnection(Exception):  | 
|
98  | 
    pass
 | 
|
99  | 
||
100  | 
##
 | 
|
101  | 
# Redirect connection.  Request handlers can raise this exception to
 | 
|
102  | 
# indicate that the a new request should be issued.
 | 
|
103  | 
||
104  | 
class Redirect(CloseConnection):  | 
|
105  | 
def __init__(self, location):  | 
|
106  | 
self.location = location  | 
|
107  | 
||
108  | 
||
109  | 
class DownloadManager(object):  | 
|
110  | 
"""Handles pipelined/overlapped downloads.  | 
|
111  | 
||
112  | 
    Pass in a series of URLs with handlers to receive the response.
 | 
|
113  | 
    This object will spread the requests over however many sockets
 | 
|
114  | 
    seem useful.
 | 
|
115  | 
||
116  | 
    queued_requests
 | 
|
117  | 
        Requests not assigned to any channel
 | 
|
118  | 
||
119  | 
    running_requests
 | 
|
120  | 
        Currently assigned to a channel
 | 
|
121  | 
    """
 | 
|
122  | 
def __init__(self):  | 
|
123  | 
self.queued_requests = []  | 
|
124  | 
        # self.channel = HttpChannel('localhost', 8000, self)
 | 
|
125  | 
self.channels = []  | 
|
126  | 
self.try_pipelined = False  | 
|
127  | 
self.try_keepalive = False  | 
|
128  | 
self.max_channels = 5  | 
|
129  | 
||
130  | 
||
131  | 
def enqueue(self, url, consumer):  | 
|
132  | 
self.queued_requests.append((url, consumer))  | 
|
133  | 
self._wake_up_channel()  | 
|
134  | 
||
135  | 
||
136  | 
def _channel_closed(self, channel):  | 
|
137  | 
"""Called by the channel when its socket closes.  | 
|
138  | 
        """
 | 
|
139  | 
self.channels.remove(channel)  | 
|
140  | 
if self.queued_requests:  | 
|
141  | 
            # might recreate one
 | 
|
142  | 
self._wake_up_channel()  | 
|
143  | 
||
144  | 
||
145  | 
def _make_channel(self):  | 
|
146  | 
        # proxy2 203.17.154.69
 | 
|
147  | 
        # return HttpChannel('82.211.81.161', 80, self)         # bazaar-ng.org 
 | 
|
148  | 
        # return HttpChannel('203.17.154.69', 8080, self)
 | 
|
149  | 
return HttpChannel('127.0.0.1', 8000, self) # forwarded  | 
|
150  | 
||
151  | 
||
152  | 
def _wake_up_channel(self):  | 
|
153  | 
"""Try to wake up one channel to send the newly-added request.  | 
|
154  | 
||
155  | 
        There may be more than one request pending, and this may cause
 | 
|
156  | 
        more than one channel to take requests.  That's OK; some of
 | 
|
157  | 
        them may be frustrated.
 | 
|
158  | 
        """
 | 
|
159  | 
from random import shuffle, choice  | 
|
160  | 
||
161  | 
        # first, wake up any idle channels
 | 
|
162  | 
done = False  | 
|
163  | 
for ch in self.channels:  | 
|
164  | 
if not ch.sent_requests:  | 
|
165  | 
ch.take_one()  | 
|
166  | 
done = True  | 
|
167  | 
if done:  | 
|
168  | 
debug("woke existing idle channel(s)")  | 
|
169  | 
            return
 | 
|
170  | 
||
171  | 
if len(self.channels) < self.max_channels:  | 
|
172  | 
newch = self._make_channel()  | 
|
173  | 
self.channels.append(newch)  | 
|
174  | 
newch.take_one()  | 
|
175  | 
debug("created new channel")  | 
|
176  | 
            return
 | 
|
177  | 
||
178  | 
if self.try_pipelined:  | 
|
179  | 
            # ask existing channels to take it
 | 
|
180  | 
debug("woke busy channel")  | 
|
181  | 
choice(self.channels).take_one()  | 
|
182  | 
||
183  | 
||
184  | 
        # debug("request postponed until a channel's idle")
 | 
|
185  | 
||
186  | 
||
187  | 
||
188  | 
||
189  | 
def run(self):  | 
|
190  | 
"""Run until all outstanding requests have been served."""  | 
|
191  | 
        #while self.running_requests or self.queued_requests \
 | 
|
192  | 
        #          or not self.channel.is_idle():
 | 
|
193  | 
        #    asyncore.loop(count=1)
 | 
|
194  | 
asyncore.loop()  | 
|
195  | 
||
196  | 
||
197  | 
||
198  | 
class Response(object):  | 
|
199  | 
"""Holds in-flight response."""  | 
|
200  | 
||
201  | 
||
202  | 
||
203  | 
def _parse_response_http10(header):  | 
|
204  | 
from cStringIO import StringIO  | 
|
205  | 
||
206  | 
fp = StringIO(header)  | 
|
207  | 
r = Response()  | 
|
208  | 
||
209  | 
r.status = fp.readline().split(" ", 2)  | 
|
210  | 
r.headers = mimetools.Message(fp)  | 
|
211  | 
||
212  | 
    # we can only(?) expect to do keepalive if we got either a 
 | 
|
213  | 
    # content-length or chunked encoding; otherwise there's no way to know
 | 
|
214  | 
    # when the content ends apart from through the connection close
 | 
|
215  | 
r.content_type = r.headers.get("content-type")  | 
|
216  | 
try:  | 
|
217  | 
r.content_length = int(r.headers.get("content-length"))  | 
|
218  | 
except (ValueError, TypeError):  | 
|
219  | 
r.content_length = None  | 
|
220  | 
debug("seen content length of %r" % r.content_length)  | 
|
221  | 
||
222  | 
r.transfer_encoding = r.headers.get("transfer-encoding")  | 
|
223  | 
r.content_encoding = r.headers.get("content-encoding")  | 
|
224  | 
r.connection_reply = r.headers.get("connection")  | 
|
225  | 
||
226  | 
    # TODO: pass status code to consumer?
 | 
|
227  | 
||
228  | 
if r.transfer_encoding:  | 
|
229  | 
raise NotImplementedError()  | 
|
230  | 
||
231  | 
if r.transfer_encoding:  | 
|
232  | 
raise NotImplementedError()  | 
|
233  | 
||
234  | 
if int(r.status[1]) != 200:  | 
|
235  | 
debug("can't handle response status %r" % r.status)  | 
|
236  | 
raise NotImplementedError()  | 
|
237  | 
||
238  | 
if r.content_length == None:  | 
|
239  | 
raise NotImplementedError()  | 
|
240  | 
||
241  | 
if r.content_length == 0:  | 
|
242  | 
raise NotImplementedError()  | 
|
243  | 
||
244  | 
r.content_remaining = r.content_length  | 
|
245  | 
||
246  | 
return r  | 
|
247  | 
||
248  | 
||
249  | 
||
250  | 
||
251  | 
||
252  | 
||
253  | 
||
254  | 
class HttpChannel(asyncore.dispatcher_with_send):  | 
|
255  | 
"""One http socket, pipelining if possible."""  | 
|
256  | 
    # asynchronous http client
 | 
|
257  | 
||
258  | 
user_agent = "http_client.py 1.3ka (based on effbot)"  | 
|
259  | 
||
260  | 
proxies = urllib.getproxies()  | 
|
261  | 
||
262  | 
def __init__(self, ip_host, ip_port, manager):  | 
|
263  | 
asyncore.dispatcher_with_send.__init__(self)  | 
|
264  | 
self.manager = manager  | 
|
265  | 
||
266  | 
        # if a response header has been seen, this holds it
 | 
|
267  | 
self.response = None  | 
|
268  | 
||
269  | 
self.data = ""  | 
|
270  | 
||
271  | 
self.chunk_size = None  | 
|
272  | 
||
273  | 
self.timestamp = time.time()  | 
|
274  | 
||
275  | 
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)  | 
|
276  | 
debug('connecting...')  | 
|
277  | 
self.connect((ip_host, ip_port))  | 
|
278  | 
||
279  | 
        # sent_requests holds (url, consumer) 
 | 
|
280  | 
self.sent_requests = []  | 
|
281  | 
||
282  | 
self._outbuf = ''  | 
|
283  | 
||
284  | 
||
285  | 
def __repr__(self):  | 
|
286  | 
return 'HttpChannel(local_port=%r)' % (self.getsockname(),)  | 
|
287  | 
||
288  | 
||
289  | 
def is_idle(self):  | 
|
290  | 
return (not self.sent_requests)  | 
|
291  | 
||
292  | 
||
293  | 
def handle_connect(self):  | 
|
294  | 
debug("connected")  | 
|
295  | 
self.take_one()  | 
|
296  | 
||
297  | 
||
298  | 
def take_one(self):  | 
|
299  | 
"""Accept one request from the manager if possible."""  | 
|
300  | 
if self.manager.try_pipelined:  | 
|
301  | 
if len(self.sent_requests) > 4:  | 
|
302  | 
                return
 | 
|
303  | 
else:  | 
|
304  | 
if len(self.sent_requests) > 0:  | 
|
305  | 
                return 
 | 
|
306  | 
||
307  | 
try:  | 
|
308  | 
url, consumer = self.manager.queued_requests.pop(0)  | 
|
309  | 
debug('request accepted by channel')  | 
|
310  | 
except IndexError:  | 
|
311  | 
            return
 | 
|
312  | 
||
313  | 
        # TODO: If there are too many already in flight, don't take one.
 | 
|
314  | 
        # TODO: If the socket's not writable (tx buffer full), don't take.
 | 
|
315  | 
self._push_request_http10(url, consumer)  | 
|
316  | 
||
317  | 
||
318  | 
||
319  | 
def _push_request_http10(self, url, consumer):  | 
|
320  | 
"""Send a request, and add it to the outstanding queue."""  | 
|
321  | 
        # TODO: check the url requested is appropriate for this connection
 | 
|
322  | 
||
323  | 
        # TODO: If there are too many requests outstanding or (less likely) the 
 | 
|
324  | 
        # connection fails, queue it for later use.
 | 
|
325  | 
||
326  | 
        # TODO: Keep track of requests that have been sent but not yet fulfilled,
 | 
|
327  | 
        # because we might need to retransmit them if the connection fails. (Or
 | 
|
328  | 
        # should the caller do that?)
 | 
|
329  | 
||
330  | 
request = self._form_request_http10(url)  | 
|
331  | 
debug('send request for %s from %r' % (url, self))  | 
|
332  | 
||
333  | 
        # dispatcher_with_send handles buffering the data until it can
 | 
|
334  | 
        # be written, and hooks handle_write.
 | 
|
335  | 
||
336  | 
self.send(request)  | 
|
337  | 
||
338  | 
self.sent_requests.append((url, consumer))  | 
|
339  | 
||
340  | 
||
341  | 
def _form_request_http10(self, url):  | 
|
342  | 
        # TODO: get right vhost name
 | 
|
343  | 
request = [  | 
|
344  | 
"GET %s HTTP/1.0" % (url),  | 
|
345  | 
"Host: www.bazaar-ng.org",  | 
|
346  | 
            ]
 | 
|
347  | 
||
348  | 
if self.manager.try_keepalive or self.manager.try_pipelined:  | 
|
349  | 
request.extend([  | 
|
350  | 
"Keep-Alive: 60",  | 
|
351  | 
"Connection: keep-alive",  | 
|
352  | 
                ])
 | 
|
353  | 
||
354  | 
        # make sure to include a user agent
 | 
|
355  | 
for header in request:  | 
|
356  | 
if string.lower(header).startswith("user-agent:"):  | 
|
357  | 
                break
 | 
|
358  | 
else:  | 
|
359  | 
request.append("User-Agent: %s" % self.user_agent)  | 
|
360  | 
||
361  | 
return string.join(request, "\r\n") + "\r\n\r\n"  | 
|
362  | 
||
363  | 
||
364  | 
def handle_read(self):  | 
|
365  | 
        # handle incoming data
 | 
|
366  | 
data = self.recv(2048)  | 
|
367  | 
||
368  | 
self.data = self.data + data  | 
|
369  | 
||
370  | 
if len(data):  | 
|
371  | 
debug('got %d bytes from socket' % len(data))  | 
|
372  | 
else:  | 
|
373  | 
debug('server closed connection')  | 
|
374  | 
||
375  | 
while self.data:  | 
|
376  | 
consumer = self.sent_requests[0][1]  | 
|
377  | 
if not self.response:  | 
|
378  | 
                # do not have a full response header yet
 | 
|
379  | 
||
380  | 
                # check if we've seen a full header
 | 
|
381  | 
debug('getting header for %s' % self.sent_requests[0][0])  | 
|
382  | 
||
383  | 
header = self.data.split("\r\n\r\n", 1)  | 
|
384  | 
if len(header) <= 1:  | 
|
385  | 
                    return
 | 
|
386  | 
header, self.data = header  | 
|
387  | 
||
388  | 
self.response = _parse_response_http10(header)  | 
|
389  | 
self.content_remaining = self.response.content_length  | 
|
390  | 
||
391  | 
if not self.data:  | 
|
392  | 
                return
 | 
|
393  | 
||
394  | 
            # we now know how many (more) content bytes we have, and how much
 | 
|
395  | 
            # is in the data buffer. there are two main possibilities:
 | 
|
396  | 
            # too much data, and some must be left behind containing the next
 | 
|
397  | 
            # response headers, or too little, or possibly just right
 | 
|
398  | 
||
399  | 
want = self.content_remaining  | 
|
400  | 
if want > 0:  | 
|
401  | 
got_data = self.data[:want]  | 
|
402  | 
self.data = self.data[want:]  | 
|
403  | 
||
404  | 
assert got_data  | 
|
405  | 
||
406  | 
self.content_remaining -= len(got_data)  | 
|
407  | 
||
408  | 
debug('pass back %d bytes of %s, %d remain'  | 
|
409  | 
% (len(got_data),  | 
|
410  | 
self.sent_requests[0][0],  | 
|
411  | 
self.content_remaining))  | 
|
412  | 
consumer.feed(data)  | 
|
413  | 
||
414  | 
if self.content_remaining == 0:  | 
|
415  | 
del self.sent_requests[0]  | 
|
416  | 
||
417  | 
debug('content complete')  | 
|
418  | 
consumer.content_complete()  | 
|
419  | 
||
420  | 
                # reset lots of things and try to get the next response header
 | 
|
421  | 
if self.response.connection_reply == 'close':  | 
|
422  | 
debug('server requested close')  | 
|
423  | 
self.manager._channel_closed(self)  | 
|
424  | 
self.close()  | 
|
425  | 
elif not self.manager.try_keepalive:  | 
|
426  | 
debug('no keepalive for this socket')  | 
|
427  | 
self.manager._channel_closed(self)  | 
|
428  | 
self.close()  | 
|
429  | 
else:  | 
|
430  | 
debug("ready for next header...")  | 
|
431  | 
self.take_one()  | 
|
432  | 
self.response = None  | 
|
433  | 
||
434  | 
||
435  | 
||
436  | 
def handle_close(self):  | 
|
437  | 
debug('async told us of close on %r' % self)  | 
|
438  | 
        # if there are outstanding requests should probably reopen and 
 | 
|
439  | 
        # retransmit, but if we're not making any progress then give up
 | 
|
440  | 
self.manager._channel_closed(self)  | 
|
441  | 
self.close()  | 
|
442  | 
||
443  | 
||
444  | 
class DummyConsumer:  | 
|
445  | 
def __init__(self, url, pb):  | 
|
446  | 
self.url = url  | 
|
447  | 
self.outf = None  | 
|
448  | 
self._pb = pb  | 
|
449  | 
||
450  | 
def feed(self, data):  | 
|
451  | 
        # print "feed", repr(data)
 | 
|
452  | 
        # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
 | 
|
453  | 
if not self.outf:  | 
|
454  | 
base = self.url[self.url.rindex('/')+1:]  | 
|
455  | 
self.outf = file('/tmp/download/' + base, 'wb')  | 
|
456  | 
self.outf.write(data)  | 
|
457  | 
||
458  | 
def error(self, err_info):  | 
|
459  | 
import traceback  | 
|
460  | 
error('error reported to consumer')  | 
|
461  | 
traceback.print_exception(err_info[0], err_info[1], err_info[2])  | 
|
462  | 
sys.exit(1)  | 
|
463  | 
||
464  | 
def content_complete(self):  | 
|
465  | 
info('content complete from %s' % self.url)  | 
|
466  | 
self.outf.close()  | 
|
467  | 
self.outf = None  | 
|
468  | 
        # using last_cnt is cheating
 | 
|
469  | 
self._pb.update('downloading inventory',  | 
|
470  | 
self._pb.last_cnt+1,  | 
|
471  | 
self._pb.last_total)  | 
|
472  | 
||
473  | 
||
474  | 
||
475  | 
if __name__ == "__main__":  | 
|
476  | 
logging.basicConfig(level=logging.DEBUG)  | 
|
477  | 
||
478  | 
mgr = DownloadManager()  | 
|
479  | 
||
480  | 
from bzrlib.branch import Branch  | 
|
481  | 
from bzrlib.progress import ProgressBar  | 
|
482  | 
||
483  | 
pb = ProgressBar()  | 
|
484  | 
revs = Branch('/home/mbp/work/bzr').revision_history()  | 
|
485  | 
pb.update('downloading inventories', 0, len(revs))  | 
|
486  | 
||
487  | 
for rev in revs:  | 
|
488  | 
url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \  | 
|
489  | 
+ rev + '.gz'  | 
|
490  | 
mgr.enqueue(url, DummyConsumer(url, pb))  | 
|
491  | 
||
492  | 
mgr.run()  | 
|
493  | 
||
494  | 
||
495  | 
||
496  | 
||
497  | 
#     for url in ['http://www.bazaar-ng.org/',
 | 
|
498  | 
#                 'http://www.bazaar-ng.org/tutorial.html',
 | 
|
499  | 
#                 'http://www.bazaar-ng.org/download.html',
 | 
|
500  | 
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
 | 
|
501  | 
#                 ]:
 |