gpt4 book ai didi

python - 使用 asyncio 生成器和 asyncio.as_completed

转载 作者:行者123 更新时间:2023-12-01 00:30:47 40 4
gpt4 key购买 nike

我有一些代码,用于抓取 URL、解析信息,然后使用 SQLAlchemy 将其放入数据库中。我尝试异步执行此操作,同时限制同时请求的最大数量。

这是我的代码:

async def get_url(aiohttp_session, url1, url2):
async with session.get(url1) as r_url1:
if r_url1.status == 200:
async with session.get(url2) as r_url2:
if r_url2.status == 200:
return await r_url1.json(), await r_url2.json()

async def url_generator(formatted_start_date, formatted_end_date, machine_id, interval):
interval_start = formatted_start_date
interval_end = formatted_start_date + interval

while interval_end <= formatted_end_date:
yield (f"https://example.org/start={interval_start}"
f"Start={datetime.strftime(interval_start, DATETIME_FORMAT)}"
f"&End={datetime.strftime(interval_end, DATETIME_FORMAT)}"
f"&machines={machine_id}",
f"https://example.org/start={interval_start}"
f"Start={datetime.strftime(interval_start, DATETIME_FORMAT)}"
f"&End={datetime.strftime(interval_end, DATETIME_FORMAT)}"
f"&machines={machine_id}&groupby=Job"
)
interval_start += interval
interval_end += interval

async def parse(database, url1_json, url2_json):
""" Do some parsing and save it using credentials stored in the database object """


def main(database, formatted_start_date, formatted_end_date, machine_id, interval):
async for url1_json, url2_json in asyncio.as_completed(url_generator(formatted_start_date, formatted_end_date, machine_id, interval)):
parse(database, url1_json, url2_json)

我收到错误yield from should be used as context manager expression

我尝试查看 documentation here以及同步原语,但我仍然对我哪里出了问题以及应该如何从生成器创建任务感到困惑。

最佳答案

发布的代码存在几个问题:

  • 您尝试使用 as_completed 作为异步迭代器,并使用 async for 迭代其结果。但是, as_completed 不会返回异步迭代器(至少 not yet ),并且必须使用常规 for 进行迭代,并显式等待每个生成的对象,如 shown in the docs 。 .

  • 您将异步迭代器传递给 as_completed,而它接受普通容器或(常规)可迭代器。

  • 您在未使用 async def 定义的函数中使用 async for,这应该是语法错误。另外,parse() 被定义为协程,您不会等待它。

好消息是,由于 url_generator 已经是一个生成器,因此您根本不需要 as_completed,您应该能够迭代它:

async def main(database, formatted_start_date, formatted_end_date,
machine_id, interval):
async for url1_json, url2_json in url_generator(
formatted_start_date, formatted_end_date,
machine_id, interval)):
await parse(database, url1_json, url2_json)

但请注意,async for 不会自动并行化迭代,它只会允许其他协程与迭代的协程并行运行。要并行化迭代,需要调用 create_task并行提交任务,并使用 asyncio.Semaphore限制并行任务的数量。例如:

async def parse(database, url1_json, url2_json, limit):
# async with applied to a semaphore ensures that no more than N
# coroutines that use the same semaphore enter the "with" block
# in parallel
async with limit:
... code goes here ...

async def main(database, formatted_start_date, formatted_end_date,
machine_id, interval):
limit = asyncio.Semaphore(10)

# create all coroutines in advance using create_task
# and run them in parallel, relying on the semaphore
# limit the number of simultaneous requests
tasks = []
async for url1_json, url2_json in url_generator(
formatted_start_date, formatted_end_date,
machine_id, interval)):
# this create_task just creates the task - it will
# start running when we return to the event loop
tasks.append(asyncio.create_task(parse(database, url1_json, url2_json, limit))

# suspend to the event loop, resuming this coroutine only after
# all the tasks have finished (or any of them raises)
await asyncio.gather(*tasks)

请注意,url_generator 不需要异步,因为它不需要等待任何事情。您可以使用 def 定义它,并使用 for 迭代它。

关于python - 使用 asyncio 生成器和 asyncio.as_completed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58191733/

40 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com