gpt4 book ai didi

python - 在 Python asyncio 中创建并发任务之间的依赖关系

转载 作者:太空宇宙 更新时间:2023-11-04 09:33:23 25 4
gpt4 key购买 nike

我在消费者/生产者关系中有两个任务,由 asyncio.Queue 分隔。如果生产者任务失败,我希望消费者任务也尽快失败,而不是无限期地等待队列。可以独立于生产者任务创建(生成)消费者任务。

一般来说,我想在两个任务之间实现依赖关系,这样一个任务的失败也是另一个任务的失败,同时保持这两个任务并发(即一个不会直接等待另一个)。

这里可以使用什么样的解决方案(例如模式)?

基本上,我在想erlang's "links" .

我认为可以使用回调实现类似的东西,即 asyncio.Task.add_done_callback

谢谢!

最佳答案

来自评论:

The behavior I'm trying to avoid is the consumer being oblivious to the producer's death and waiting indefinitely on the queue. I want the consumer to be notified of the producer's death, and have a chance to react. or just fail, and that even while it's also waiting on the queue.

answer presented by Yigal 除外,另一种方法是设置第三个任务来监视这两个任务,并在另一个完成时取消一个。这可以推广到任何两个任务:

async def cancel_when_done(source, target):
assert isinstance(source, asyncio.Task)
assert isinstance(target, asyncio.Task)
try:
await source
except:
# SOURCE is a task which we expect to be awaited by someone else
pass
target.cancel()

现在在设置生产者和消费者时,您可以将它们与上述功能联系起来。例如:

async def producer(q):
for i in itertools.count():
await q.put(i)
await asyncio.sleep(.2)
if i == 7:
1/0

async def consumer(q):
while True:
val = await q.get()
print('got', val)

async def main():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
p = loop.create_task(producer(queue))
c = loop.create_task(consumer(queue))
loop.create_task(cancel_when_done(p, c))
await asyncio.gather(p, c)

asyncio.get_event_loop().run_until_complete(main())

关于python - 在 Python asyncio 中创建并发任务之间的依赖关系,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54775678/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com