gpt4 book ai didi

python - 处理长度未知的行 block - 使用异步?

转载 作者:行者123 更新时间:2023-11-30 22:15:25 24 4
gpt4 key购买 nike

我有一个文件,每行都有一个 url。我正在使用 aiohttp 对这些网址批量运行异步请求调用。

文件很大,我的内存很小。我不知道文件中有多少行,用计数器逐行读取整个文件将需要很长时间。

我怎样才能:

  • 将 100,000 行放入列表中
  • 处理这些
  • 暂停文件读取
  • 抓取接下来的 100,000 行= 重复直到我捕获剩下的部分?

我沿着异步的思路思考,但我可能严重误解了这个库。

counter = 0
inputs=[]
async with open("test.txt") as f:
for line in f:
counter=counter+1
if counter%100000 != 0:
inputs.append(line.strip())
else:
await get_req_fn(inputs)
inputs=[]

最佳答案

您正在寻找的“暂停”正是 await 所做的。它会阻塞当前的协程(允许其他协程取得进展),直到它等待的协程完成为止。除了在 open 周围使用 async with 之外,您的代码基本上是正确的。这是一个更完整的版本(未经测试):

import asyncio, aiohttp
from itertools import islice

BATCH_SIZE = 1000 # start with something reasonable

async def main():
async with aiohttp.ClientSession() as session:
with open("test.txt") as f:
while True:
# take the next BATCH_SIZE lines
batch = [line.strip() for line in islice(f, BATCH_SIZE)]
if not batch:
# no more lines - we're done
break
await get_req_fn(batch, session)

在实现get_req_fn时,您需要注意启用并行执行,但也要允许等待整个批处理的完成。其关键成分是协程组合器,这些函数将多个协程聚合成一个可等待对象。功能强大且使用非常简单的一个是 gather :

async def get_req_fn(urls, session):
coros = []
for url in urls:
coros.append(single_req(url, session))
await asyncio.gather(*coros)

gather 启动给定的协程,并在 awaited 时阻塞当前协程,直到所有协程完成。这允许 await get_req_fn(batch, session) 暂停读取,直到下载整个批处理。

最后,single_req 可能如下所示:

async def single_req(url, session):
try:
async with session.get(url) as resp:
text = await resp.text()
# process text, or save it to a file, etc
except IOError as e:
print(f'error fetching {url}: {e}')

所有函数都接受 session objectmain() 中创建,因为为每个请求创建一个新 session 是 strongly discouraged在文档中。

最后,要运行整个过程,请使用以下内容:

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

关于python - 处理长度未知的行 block - 使用异步?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50310231/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com