gpt4 book ai didi

python-3.x - 在 Python 中追加到合并的异步生成器

转载 作者:行者123 更新时间:2023-12-02 01:24:34 24 4
gpt4 key购买 nike

我正在尝试合并 Python 3.7 中的一堆异步生成器,同时仍在迭代中添加新的异步生成器。我目前正在使用 aiostream 来合并我的生成器:

from asyncio import sleep, run
from aiostream.stream import merge

async def go():
yield 0
await sleep(1)
yield 50
await sleep(1)
yield 100

async def main():
tasks = merge(go(), go(), go())

async for v in tasks:
print(v)

if __name__ == '__main__':
run(main())

但是,一旦循环开始,我需要能够继续添加到正在运行的任务。类似的东西。

from asyncio import sleep, run
from aiostream.stream import merge

async def go():
yield 0
await sleep(1)
yield 50
await sleep(1)
yield 100

async def main():
tasks = merge(go(), go(), go())

async for v in tasks:
if v == 50:
tasks.merge(go())
print(v)

if __name__ == '__main__':
run(main())

我最接近的方法是使用 aiostream 库,但也许仅使用 native asyncio 标准库也可以相当整齐地编写它。

最佳答案

这是一个即使使用大量异步迭代器也应该高效工作的实现:

class merge:
def __init__(self, *iterables):
self._iterables = list(iterables)
self._wakeup = asyncio.Event()

def _add_iters(self, next_futs, on_done):
for it in self._iterables:
it = it.__aiter__()
nfut = asyncio.ensure_future(it.__anext__())
nfut.add_done_callback(on_done)
next_futs[nfut] = it
del self._iterables[:]
return next_futs

async def __aiter__(self):
done = {}
next_futs = {}
def on_done(nfut):
done[nfut] = next_futs.pop(nfut)
self._wakeup.set()

self._add_iters(next_futs, on_done)
try:
while next_futs:
await self._wakeup.wait()
self._wakeup.clear()
for nfut, it in done.items():
try:
ret = nfut.result()
except StopAsyncIteration:
continue
self._iterables.append(it)
yield ret
done.clear()
if self._iterables:
self._add_iters(next_futs, on_done)
finally:
# if the generator exits with an exception, or if the caller stops
# iterating, make sure our callbacks are removed
for nfut in next_futs:
nfut.remove_done_callback(on_done)

def append_iter(self, new_iter):
self._iterables.append(new_iter)
self._wakeup.set()

示例代码所需的唯一更改是该方法名为 append_iter,而不是 merge

关于python-3.x - 在 Python 中追加到合并的异步生成器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51399339/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com