gpt4 book ai didi

python - 创建在协程完成时产生协程结果的生成器

转载 作者:太空狗 更新时间:2023-10-29 22:15:03 25 4
gpt4 key购买 nike

目前,我有一个低效的同步生成器,它按顺序发出许多 HTTP 请求并产生结果。我想使用 asyncioaiohttp并行化请求,从而加速这个生成器,但我想将它保留为普通生成器(而不是 PEP 525 async generator ),以便调用它的非异步代码不需要修改。如何创建这样的生成器?

最佳答案

asyncio.as_completed()采用可迭代的协程或 future ,并按照输入 future 完成的顺序返回可迭代的 future 。 通常,您会遍历其结果并await 来自async 函数的成员...

import asyncio

async def first():
await asyncio.sleep(5)
return 'first'

async def second():
await asyncio.sleep(1)
return 'second'

async def third():
await asyncio.sleep(3)
return 'third'

async def main():
for future in asyncio.as_completed([first(), second(), third()]):
print(await future)

# Prints 'second', then 'third', then 'first'
asyncio.run(main())

...但是为了这个问题的目的,我们想要的是能够从一个普通的生成器中产生这些结果,这样普通的同步代码就可以在不知道 async 的情况下使用它们功能正在幕​​后使用。我们可以通过对 as_completed 调用产生的 future 调用 loop.run_until_complete() 来做到这一点...

import asyncio

async def first():
await asyncio.sleep(5)
return 'first'

async def second():
await asyncio.sleep(1)
return 'second'

async def third():
await asyncio.sleep(3)
return 'third'

def ordinary_generator():
loop = asyncio.get_event_loop()
for future in asyncio.as_completed([first(), second(), third()]):
yield loop.run_until_complete(future)

# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
print(element)

通过这种方式,我们以一种不需要调用者将任何函数定义为 async,甚至不需要知道 ordinary_generator 在后台使用 asyncio

作为在某些情况下提供更大灵 active 的 ordinary_generator() 的替代实现,我们可以重复调用 asyncio.wait()使用 FIRST_COMPLETED 标志而不是循环遍历 as_completed():

import concurrent.futures

def ordinary_generator():
loop = asyncio.get_event_loop()
pending = [first(), second(), third()]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(
pending,
return_when=concurrent.futures.FIRST_COMPLETED
)
)
for job in done:
yield job.result()

这种维护 pending 作业列表的方法的优点是我们可以调整它以动态地将作业添加到 pending 列表中。这在我们的异步作业可以向队列中添加不可预测数量的进一步作业的用例中很有用 - 就像网络蜘蛛跟踪它访问的每个页面上的所有链接一样。

一个警告:上面的方法假设我们从主线程调用同步代码,在这种情况下 get_event_loop 保证给我们一个循环,我们不需要 .close 它。如果我们希望 ordinary_generator 可以在非主线程中使用,尤其是那些之前可能已经创建了事件循环的线程,那么生活就会变得更加艰难,因为我们不能依赖 get_event_loop(它在任何还没有事件循环的非主线程上引发 RuntimeError)。在那种情况下,我能想到的最简单的事情就是分离出一个线程来运行我们的asyncio代码,并通过队列与它通信:

def ordinary_generator():
sentinel = object()
queue = Queue()

def thread_entry_point():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for future in asyncio.as_completed([first(), second(), third()]):
try:
queue.put(loop.run_until_complete(future))
except Exception as e:
queue.put((sentinel, e))
break
loop.close()
queue.put(sentinel)

Thread(target=thread_entry_point).start()
while True:
val = queue.get()
if val is sentinel:
return
if isinstance(val, tuple) and len(val) == 2 and val[0] is sentinel:
raise val[1]
yield val

(将倒数第二个示例中的 run_until_complete 的使用与最后一个示例中的额外线程的使用结合起来,作为练习留给需要这样做的任何读者。)

关于python - 创建在协程完成时产生协程结果的生成器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41901795/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com