gpt4 book ai didi

python - 使用 aiohttp/asyncio 发出 100 万次请求 - 字面意思

转载 作者:太空狗 更新时间:2023-10-30 01:20:31 25 4
gpt4 key购买 nike

我跟进了本教程:https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html当我处理 50 000 个请求时,一切正常。但是我需要进行 100 万次 API 调用,然后我遇到了这段代码的问题:

    url = "http://some_url.com/?id={}"
tasks = set()

sem = asyncio.Semaphore(MAX_SIM_CONNS)
for i in range(1, LAST_ID + 1):
task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))
tasks.add(task)

responses = asyncio.gather(*tasks)
return await responses

因为 Python 需要创建 100 万个任务,它基本上只是滞后,然后在终端打印 Killed 消息。有什么方法可以使用预制的 url 集(或列表)的生成器?谢谢。

最佳答案

一次安排所有 100 万个任务

这是您正在谈论的代码。它最多占用 3 GB RAM,因此如果您的可用内存不足,它很可能会被操作系统终止。

import asyncio
from aiohttp import ClientSession

MAX_SIM_CONNS = 50
LAST_ID = 10**6

async def fetch(url, session):
async with session.get(url) as response:
return await response.read()

async def bound_fetch(sem, url, session):
async with sem:
await fetch(url, session)

async def fetch_all():
url = "http://localhost:8080/?id={}"
tasks = set()
async with ClientSession() as session:
sem = asyncio.Semaphore(MAX_SIM_CONNS)
for i in range(1, LAST_ID + 1):
task = asyncio.create_task(bound_fetch(sem, url.format(i), session))
tasks.add(task)
return await asyncio.gather(*tasks)

if __name__ == '__main__':
asyncio.run(fetch_all())

使用队列简化工作

这是我的建议如何使用 asyncio.Queue将 URL 传递给工作任务。队列按需填充,没有预制的 URL 列表。

它只需要 30 MB RAM :)

import asyncio
from aiohttp import ClientSession

MAX_SIM_CONNS = 50
LAST_ID = 10**6

async def fetch(url, session):
async with session.get(url) as response:
return await response.read()

async def fetch_worker(url_queue):
async with ClientSession() as session:
while True:
url = await url_queue.get()
try:
if url is None:
# all work is done
return
response = await fetch(url, session)
# ...do something with the response
finally:
url_queue.task_done()
# calling task_done() is necessary for the url_queue.join() to work correctly

async def fetch_all():
url = "http://localhost:8080/?id={}"
url_queue = asyncio.Queue(maxsize=100)
worker_tasks = []
for i in range(MAX_SIM_CONNS):
wt = asyncio.create_task(fetch_worker(url_queue))
worker_tasks.append(wt)
for i in range(1, LAST_ID + 1):
await url_queue.put(url.format(i))
for i in range(MAX_SIM_CONNS):
# tell the workers that the work is done
await url_queue.put(None)
await url_queue.join()
await asyncio.gather(*worker_tasks)

if __name__ == '__main__':
asyncio.run(fetch_all())

关于python - 使用 aiohttp/asyncio 发出 100 万次请求 - 字面意思,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38831322/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com