问题:我给你10个的url,你帮我去把10url的网址下载。
传统方式
# 传统串行方式 import requests import time
urls = [‘https://github.com/’ for _ in range(10)] start = time.time() for url in urls: response = requests.get(url) # print(response)
spend_time = time.time() – start print(spend_time) # 12.493084907531738
一、多进程和多线程实现并发
import time from concurrent.futures import ProcessPoolExecutor
import requests
start = time.time()
def task(url): response = requests.get(url) return response
def done(future, *args, **kwargs): response = future.result() print(response.url)
if __name__ == ‘__main__’: url_list = [‘https://www.douban.com/’ for _ in range(100)] with ProcessPoolExecutor(max_workers=10) as pool: for url in url_list: v = pool.submit(task, url) v.add_done_callback(done) print(time.time() – start) # 11.862671136856079
花费时间 11.862671136856079秒
#########编写方式二######### import requests from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor()
def task(url): response = requests.get(url) return response
def done(future,*args,**kwargs): response = future.result() print(response.url)
if __name__ == ‘__main__’: start = time.time() with ThreadPoolExecutor(max_workers=os.cpu_count()) as pool: url_list = [‘https://www.douban.com/’ for _ in range(100)] for url in url_list: v = pool.submit(task,url) v.add_done_callback(done) print(time.time() – start) # 8.904985904693604
花费时间 8.904985904693604秒
由于GIL限制,建议:IO密集的任务,用ThreadPoolExecutor;CPU密集任务,用ProcessPoolExcutor。
二、基于事件循环的异步IO
1. asyncio + aiohttp
import time import asyncio
import aiohttp
async def start_request(session, url): sem = asyncio.Semaphore(10, loop=loop) async with sem: print(f’make request to {url}’) with async_timeout.timeout(60): async with session.get(url, verify_ssl=False) as response: if response.status == 200: print(response.status)
async def run(urls): conn = aiohttp.TCPConnector(ssl=False, limit=60, # 连接池在windows下不能太大, <500 use_dns_cache=True) async with aiohttp.ClientSession(connector=conn, loop=loop) as session: datas = await asyncio.gather(*[start_request(session, url) for url in urls], return_exceptions=True) for ind, url in enumerate(urls): if isinstance(datas[ind], Exception): print(f”{ind}, {url}: 下载失败 请重新下载:”)
if __name__ == ‘__main__’: start = time.time() urls = ((‘http://www.baidu.com/’) for _ in range(100)) loop = asyncio.get_event_loop() loop.run_until_complete(run(urls)) print(time.time() – start) # 1.4860568046569824
2. Twisted
import time
from twisted.internet import defer from twisted.internet import reactor from twisted.web.client import getPage
start = time.time()
def one_done(content, arg): response = content.decode(‘utf-8′) # print(response) print(arg)
def all_done(arg): reactor.stop() print(time.time() – start)
@defer.inlineCallbacks def task(url): res = getPage(bytes(url, encoding=’utf8’)) # 发送Http请求 res.addCallback(one_done, url) yield res
url_list = (‘http://www.cnblogs.com’ for _ in range(100))
defer_list = [] # [特殊,特殊,特殊(已经向url发送请求)] for url in url_list: v = task(url) defer_list.append(v)
d = defer.DeferredList(defer_list) d.addBoth(all_done)
reactor.run() # 死循环 # 5.039534091949463
3. tornado
import time
from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop
COUNT = 0 start = time.time()
def handle_response(response): global COUNT COUNT -= 1 if response.error: print(“Error:”, response.error) else: # print(response.body) print(response.request) # 方法同twisted # ioloop.IOLoop.current().stop() if COUNT == 0: ioloop.IOLoop.current().stop()
def func(): url_list = [‘http://www.baidu.com’ for _ in range(100)] global COUNT COUNT = len(url_list) for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response)
if __name__ == ‘__main__’: ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start() # 死循环 print(time.time() – start) # 3.0621743202209473
以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】
“”” ########http请求本质,IO阻塞######## sk = socket.socket() #1.连接 sk.connect((‘www.baidu.com’,80,)) #阻塞 print(‘连接成功了’) #2.连接成功后发送消息 sk.send(b”GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n”)
#3.等待服务端响应 data = sk.recv(8096)#阻塞 print(data) #\r\n\r\n区分响应头和影响体
#关闭连接 sk.close() “”” “”” ########http请求本质,IO非阻塞######## sk = socket.socket() sk.setblocking(False) #1.连接 try: sk.connect((‘www.baidu.com’,80,)) #非阻塞,但会报错 print(‘连接成功了’) except BlockingIOError as e: print(e)
#2.连接成功后发送消息 sk.send(b”GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n”)
#3.等待服务端响应 data = sk.recv(8096)#阻塞 print(data) #\r\n\r\n区分响应头和影响体
#关闭连接 sk.close() “””
异步非阻塞请求的本质
自定义异步非阻塞IO
class HttpRequest: def __init__(self,sk,host,callback): self.socket = sk self.host = host self.callback = callback
def fileno(self): return self.socket.fileno()
class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None
self.initialize()
def initialize(self): headers, body = self.recv_data.split(b’\r\n\r\n’, 1) self.body = body header_list = headers.split(b’\r\n’) for h in header_list: h_str = str(h,encoding=’utf-8′) v = h_str.split(‘:’,1) if len(v) == 2: self.header_dict[v[0]] = v[1]
class AsyncRequest: def __init__(self): self.conn = [] self.connection = [] # 用于检测是否已经连接成功
def add_request(self,host,callback): try: sk = socket.socket() sk.setblocking(0) sk.connect((host,80)) except BlockingIOError as e: pass request = HttpRequest(sk,host,callback) self.conn.append(request) self.connection.append(request)
def run(self):
while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05) for w in wlist: print(w.host,’连接成功…’) # 只要能循环到,表示socket和服务器端已经连接成功 tpl = “GET / HTTP/1.0\r\nHost:%s\r\n\r\n” %(w.host,) w.socket.send(bytes(tpl,encoding=’utf-8′)) self.connection.remove(w) for r in rlist: # r,是HttpRequest recv_data = bytes() while True: try: chunck = r.socket.recv(8096) recv_data += chunck except Exception as e: break response = HttpResponse(recv_data) r.callback(response) r.socket.close() self.conn.remove(r) if len(self.conn) == 0: break
def f1(response): print(‘保存到文件’,response.header_dict)
def f2(response): print(‘保存到数据库’, response.header_dict)
url_list = [ {‘host’:’www.youku.com’,’callback’: f1}, {‘host’:’v.qq.com’,’callback’: f2}, {‘host’:’www.cnblogs.com’,’callback’: f2}, ]
if __name__ == ‘__main__’: req = AsyncRequest() for item in url_list: req.add_request(item[‘host’], item[‘callback’]) req.run()
自定异步非阻塞IO
作者:张亚飞
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明。
神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试