/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
1088 by Martin Pool
- add experimental pipelined http client
1
#! /usr/bin/python
2
3
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
4
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
5
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
6
#
7
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
8
#
9
# changes:
10
# 2004-08-26 fl   unified http callback
11
# 2004-10-09 fl   factored out gzip_consumer support
12
# 2005-07-08 mbp  experimental support for keepalive connections
13
#
14
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
15
#
16
17
18
19
"""async/pipelined http client
20
21
Use
22
===
23
24
Users of this library pass in URLs they want to see, and consumer
25
objects that will receive the results at some point in the future.
26
Any number of requests may be queued up, and more may be added while
27
the download is in progress.
28
29
Requests can be both superscalar and superpipelined.  That is to say,
30
for each server there can be multiple sockets open, and each socket
31
may have more than one request in flight.
32
33
Design
34
======
35
36
There is a single DownloadManager, and a connection object for each
37
open socket.
38
39
Request/consumer pairs are maintained in queues.  Each connection has
40
a list of transmitted requests whose response has not yet been
41
received.  There is also a per-server list of requests that have not
42
yet been submitted.
43
44
When a connection is ready to transmit a new request, it takes one
45
from the unsubmitted list, sends the request, and adds the request to
46
its unfulfilled list.  This should happen when the connection has
47
space for more transmissions or when a new request is added by the
48
user.  If the connection terminates with unfulfilled requests they are
49
put back onto the unsubmitted list, to be retried elsewhere.
50
51
Because responses come back precisely in order, the connection always
52
knows what it should expect next: the response for the next
53
unfulfilled request.
54
"""
55
56
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
57
# with a hostname does a remote DNS resolution, which is pretty sucky.
58
# Shouldn't there be a cache in glibc?  We should probably cache the
59
# address in, say, the DownloadManager.
60
61
# TODO: A default consumer operation that writes the received data
62
# into a file; by default the file is named the same as the last
63
# component of the URL.
64
65
# TODO: A utility function that is given a list of URLs, and downloads
66
# them all parallel/pipelined.  If any fail, it raises an exception
67
# (and discards the rest), or perhaps can be told to continue anyhow.
68
# The content is written into temporary files.  It returns a list of
69
# readable file objects.
70
71
import asyncore
72
import socket, string, time, sys
73
import StringIO
74
import mimetools, urlparse, urllib
75
import logging
76
77
logger = logging.getLogger('bzr.http_client')
78
debug = logger.debug
79
info = logger.info
80
error = logger.error
81
82
83
##
84
# Close connection.   Request handlers can raise this exception to
85
# indicate that the connection should be closed.
86
87
class CloseConnection(Exception):
88
    pass
89
90
##
91
# Redirect connection.  Request handlers can raise this exception to
92
# indicate that the a new request should be issued.
93
94
class Redirect(CloseConnection):
95
    def __init__(self, location):
96
        self.location = location
97
98
99
class DownloadManager(object):
100
    """Handles pipelined/overlapped downloads.
101
102
    Pass in a series of URLs with handlers to receive the response.
103
    This object will spread the requests over however many sockets
104
    seem useful.
105
106
    queued_requests
107
        Requests not assigned to any channel
108
109
    running_requests
110
        Currently assigned to a channel
111
    """
112
    def __init__(self):
113
        self.queued_requests = []
114
        # self.channel = HttpChannel('localhost', 8000, self)
115
        self.channels = []
116
        self.try_pipelined = False
117
        self.try_keepalive = False
118
        self.max_channels = 3
119
120
121
    def enqueue(self, url, consumer):
122
        self.queued_requests.append((url, consumer))
123
        self._wake_up_channel()
124
125
126
    def _channel_closed(self, channel):
127
        """Called by the channel when its socket closes.
128
        """
129
        self.channels.remove(channel)
130
        if self.queued_requests:
131
            # might recreate one
132
            self._wake_up_channel()
133
134
135
    def _make_channel(self):
136
        # proxy2 203.17.154.69
137
        # bazaar-ng.org 
138
        return HttpChannel('82.211.81.161', 80, self)
139
        # return HttpChannel('203.17.154.69', 8080, self)
140
        # return HttpChannel('localhost', 8000, self)
141
            
142
143
    def _wake_up_channel(self):
144
        """Try to wake up one channel to send the newly-added request.
145
146
        There may be more than one request pending, and this may cause
147
        more than one channel to take requests.  That's OK; some of
148
        them may be frustrated.
149
        """
150
        from random import shuffle, choice
151
        
152
        # first, wake up any idle channels
153
        done = False
154
        for ch in self.channels:
155
            if not ch.sent_requests:
156
                ch.take_one()
157
                done = True
158
        if done:
159
            debug("woke existing idle channel(s)")
160
            return
161
162
        if len(self.channels) < self.max_channels:
163
            newch = self._make_channel()
164
            self.channels.append(newch)
165
            newch.take_one()
166
            debug("created new channel")
167
            return
168
169
        if self.try_pipelined:
170
            # ask existing channels to take it
171
            debug("woke busy channel")
172
            choice(self.channels).take_one()
173
174
175
        debug("request left until a channel's idle")
176
        
177
178
179
180
    def run(self):
181
        """Run until all outstanding requests have been served."""
182
        #while self.running_requests or self.queued_requests \
183
        #          or not self.channel.is_idle():
184
        #    asyncore.loop(count=1)
185
        asyncore.loop()
186
187
188
189
class Response(object):
190
    """Holds in-flight response."""
191
192
193
194
def _parse_response_http10(header):
195
    from cStringIO import StringIO
196
197
    fp = StringIO(header)
198
    r = Response()
199
200
    r.status = fp.readline().split(" ", 2)
201
    r.headers = mimetools.Message(fp)
202
203
    # we can only(?) expect to do keepalive if we got either a 
204
    # content-length or chunked encoding; otherwise there's no way to know
205
    # when the content ends apart from through the connection close
206
    r.content_type = r.headers.get("content-type")
207
    try:
208
        r.content_length = int(r.headers.get("content-length"))
209
    except (ValueError, TypeError):
210
        r.content_length = None
211
    debug("seen content length of %r" % r.content_length)
212
213
    r.transfer_encoding = r.headers.get("transfer-encoding")
214
    r.content_encoding = r.headers.get("content-encoding")
215
    r.connection_reply = r.headers.get("connection")
216
217
    # TODO: pass status code to consumer?
218
219
    if r.transfer_encoding:
220
        raise NotImplementedError()
221
222
    if r.transfer_encoding:
223
        raise NotImplementedError()
224
225
    if int(r.status[1]) != 200:
226
        debug("can't handle response status %r" % r.status)
227
        raise NotImplementedError()
228
229
    if r.content_length == None:
230
        raise NotImplementedError()
231
232
    if r.content_length == 0:
233
        raise NotImplementedError()
234
235
    r.content_remaining = r.content_length                
236
237
    return r
238
239
240
    
241
    
242
        
243
244
245
class HttpChannel(asyncore.dispatcher_with_send):
246
    """One http socket, pipelining if possible."""
247
    # asynchronous http client
248
249
    user_agent = "http_client.py 1.3ka (based on effbot)"
250
251
    proxies = urllib.getproxies()
252
253
    def __init__(self, ip_host, ip_port, manager):
254
        asyncore.dispatcher_with_send.__init__(self)
255
        self.manager = manager
256
257
        # if a response header has been seen, this holds it
258
        self.response = None
259
        
260
        self.data = ""
261
262
        self.chunk_size = None
263
264
        self.timestamp = time.time()
265
266
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
267
        debug('connecting...')
268
        self.connect((ip_host, ip_port))
269
270
        # sent_requests holds (url, consumer) 
271
        self.sent_requests = []
272
273
        self._outbuf = ''
274
275
276
    def __repr__(self):
277
        return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
278
279
280
    def is_idle(self):
281
        return (not self.sent_requests)
282
283
284
    def handle_connect(self):
285
        debug("connected")
286
        self.take_one()
287
288
289
    def take_one(self):
290
        """Accept one request from the manager if possible."""
291
        if self.manager.try_pipelined:
292
            if len(self.sent_requests) > 4:
293
                return
294
        else:
295
            if len(self.sent_requests) > 0:
296
                return 
297
        
298
        try:
299
            url, consumer = self.manager.queued_requests.pop(0)
300
            debug('request accepted by channel')
301
        except IndexError:
302
            return
303
        
304
        # TODO: If there are too many already in flight, don't take one.
305
        # TODO: If the socket's not writable (tx buffer full), don't take.
306
        self._push_request_http10(url, consumer)
307
308
309
310
    def _push_request_http10(self, url, consumer):
311
        """Send a request, and add it to the outstanding queue."""
312
        # TODO: check the url requested is appropriate for this connection
313
314
        # TODO: If there are too many requests outstanding or (less likely) the 
315
        # connection fails, queue it for later use.
316
317
        # TODO: Keep track of requests that have been sent but not yet fulfilled,
318
        # because we might need to retransmit them if the connection fails. (Or
319
        # should the caller do that?)
320
321
        request = self._form_request_http10(url)
322
        debug('send request for %s from %r' % (url, self))
323
324
        # dispatcher_with_send handles buffering the data until it can
325
        # be written, and hooks handle_write.
326
327
        self.send(request)
328
329
        self.sent_requests.append((url, consumer))
330
331
332
    def _form_request_http10(self, url):
333
        # TODO: get right vhost name
334
        request = [
335
            "GET %s HTTP/1.0" % (url),
336
            "Host: www.bazaar-ng.org",
337
            ]
338
339
        if self.manager.try_keepalive or self.manager.try_pipelined:
340
            request.extend([
341
                "Keep-Alive: 60", 
342
                "Connection: keep-alive",
343
                ])
344
345
        # make sure to include a user agent
346
        for header in request:
347
            if string.lower(header).startswith("user-agent:"):
348
                break
349
        else:
350
            request.append("User-Agent: %s" % self.user_agent)
351
352
        return string.join(request, "\r\n") + "\r\n\r\n"
353
354
355
    def handle_read(self):
356
        # handle incoming data
357
        data = self.recv(2048)
358
359
        self.data = self.data + data
360
361
        if len(data):
362
            debug('got %d bytes from socket' % len(data))
363
        else:
364
            debug('server closed connection')
365
366
        while self.data:
367
            consumer = self.sent_requests[0][1]
368
            if not self.response:
369
                # do not have a full response header yet
370
371
                # check if we've seen a full header
372
                debug('getting header for %s' % self.sent_requests[0][0])
373
374
                header = self.data.split("\r\n\r\n", 1)
375
                if len(header) <= 1:
376
                    return
377
                header, self.data = header
378
379
                self.response = _parse_response_http10(header)
380
                self.content_remaining = self.response.content_length
381
382
            if not self.data:
383
                return
384
385
            # we now know how many (more) content bytes we have, and how much
386
            # is in the data buffer. there are two main possibilities:
387
            # too much data, and some must be left behind containing the next
388
            # response headers, or too little, or possibly just right
389
390
            want = self.content_remaining
391
            if want > 0:
392
                got_data = self.data[:want]
393
                self.data = self.data[want:]
394
                
395
                assert got_data
396
397
                debug('pass back %d bytes of %s' % (len(got_data),
398
                                                    self.sent_requests[0][0]))
399
                consumer.feed(data)
400
401
                self.content_remaining -= len(got_data)
402
403
            if self.content_remaining == 0:
404
                del self.sent_requests[0]
405
406
                # reset lots of things and try to get the next response header
407
                if self.response.connection_reply == 'close':
408
                    debug('server requested close')
409
                    self.manager._channel_closed(self)
410
                    self.close()
411
                elif not self.manager.try_keepalive:
412
                    debug('no keepalive for this socket')
413
                    self.manager._channel_closed(self)
414
                    self.close()
415
                else:
416
                    debug("ready for next header...")
417
                    consumer.content_complete()
418
                    self.take_one()
419
                self.response = None
420
421
422
423
    def handle_close(self):
424
        debug('async told us of close on %r' % self)
425
        # if there are outstanding requests should probably reopen and 
426
        # retransmit, but if we're not making any progress then give up
427
        self.manager._channel_closed(self)
428
        self.close()
429
430
431
if __name__ == "__main__":
432
    class dummy_consumer:
433
        def __init__(self, url):
434
            self.url = url
435
436
        def feed(self, data):
437
            # print "feed", repr(data)
438
            # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
439
            pass
440
            
441
        def error(self, err_info):
442
            import traceback
443
            traceback.print_exception(err_info[0], err_info[1], err_info[2])
444
445
        def content_complete(self):
446
            debug('content complete from %s' % self.url)
447
            
448
449
    logging.basicConfig(level=logging.DEBUG)
450
451
    mgr = DownloadManager()
452
453
    from bzrlib.branch import Branch
454
    revs = Branch('/home/mbp/work/bzr').revision_history()
455
456
        
457
    
458
#     for url in ['http://www.bazaar-ng.org/',
459
#                 'http://www.bazaar-ng.org/tutorial.html',
460
#                 'http://www.bazaar-ng.org/download.html',
461
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
462
#                 ]:
463
464
    for rev in revs:
465
#        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/' \
466
        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
467
              + rev + '.gz'
468
        mgr.enqueue(url, dummy_consumer(url))
469
470
    mgr.run()
471