gpt4 book ai didi

python - 如何在 Python 的 `asyncio.gather` 中正确处理取消的任务

转载 作者:行者123 更新时间:2023-12-01 12:03:54 25 4
gpt4 key购买 nike

所以我又试了一次 asyncio模块现在 3.8 出来了。但是,在尝试优雅地关闭事件循环时,我得到了意想不到的结果。具体来说,我正在收听 SIGINT ,取消运行Task s,收集那些 Task s,然后是 .stop()事件循环。我知道Task s 提高 CancelledError当它们被取消时,它将向上传播并结束我对 asyncio.gather 的调用除非,根据 documentation ,我通过return_exceptions=Trueasyncio.gather ,这会导致 gather等待所有Task s 取消并返回 CancelledError 的数组s。但是,似乎 return_exceptions=True仍然导致我的 gather 立即中断如果我尝试 gather,请调用取消 Task s。

这是重现效果的代码。我正在运行 python 3.8.0:

# demo.py

import asyncio
import random
import signal


async def worker():
sleep_time = random.random() * 3
await asyncio.sleep(sleep_time)
print(f"Slept for {sleep_time} seconds")

async def dispatcher(queue):
while True:
await queue.get()
asyncio.create_task(worker())
tasks = asyncio.all_tasks()
print(f"Running Tasks: {len(tasks)}")

async def shutdown(loop):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
print(f"Cancelling {len(tasks)} outstanding tasks")
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"results: {results}")
loop.stop()

async def main():
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(loop)))
queue = asyncio.Queue()
asyncio.create_task(dispatcher(queue))

while True:
await queue.put('tick')
await asyncio.sleep(1)


asyncio.run(main())

输出:

>> python demo.py 
Running Tasks: 3
Slept for 0.3071352174511871 seconds
Running Tasks: 3
Running Tasks: 4
Slept for 0.4152310498820644 seconds
Running Tasks: 4
^CCancelling 4 outstanding tasks
Traceback (most recent call last):
File "demo.py", line 38, in <module>
asyncio.run(main())
File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
return future.result()
asyncio.exceptions.CancelledError

我猜关于事件循环还有一些我不明白的地方,但我希望所有 CancelledError s 作为存储在 results 中的对象数组返回然后能够继续而不是立即看到错误。

最佳答案

是什么导致错误?

使用 asyncio.all_tasks() 时出现问题是它返回所有任务,即使是您没有直接创建的任务。按照以下方式更改您的代码以查看您取消的内容:

for task in tasks:
print(task)
task.cancel()

您不仅会看到 worker相关任务,还有:
<Task pending coro=<main() running at ...>

取消 main导致内部困惑 asyncio.run(main())你得到错误。让我们进行快速/肮脏的修改以从取消中排除此任务:
tasks = [
t
for t
in asyncio.all_tasks()
if (
t is not asyncio.current_task()
and t._coro.__name__ != 'main'
)
]

for task in tasks:
print(task)
task.cancel()

现在你会看到你的 results .

loop.stop() 导致错误

当你达到 results ,你会得到另一个错误 Event loop stopped before Future completed .这是因为 asyncio.run(main())想跑到 main()完成的。

您必须重组您的代码以允许您传递到 asyncio.run 的协程完成而不是停止事件循环,或者,例如,使用 loop.run_forever()而不是 asyncio.run .

这是我的意思的快速/肮脏的演示:
async def shutdown(loop):
# ...

global _stopping
_stopping = True
# loop.stop()

_stopping = False

async def main():
# ...

while not _stopping:
await queue.put('tick')
await asyncio.sleep(1)

现在你的代码可以正常工作了。不要在实践中使用上面的代码,这只是一个示例。尝试像我上面提到的那样重构你的代码。

如何正确处理任务

不要使用 asyncio.all_tasks() .

如果您创建了一些您想在将来取消的任务,请将其存储并仅取消存储的任务。伪代码:
i_created = []

# ...

task = asyncio.create_task(worker())
i_created.append(task)

# ...

for task in i_created:
task.cancel()

这可能看起来不方便,但这是一种确保您不会取消不想被取消的事情的方法。

还有一件事

另请注意, asyncio.run() much more不仅仅是启动事件循环。特别是, it cancels完成之前的所有挂起任务。在某些情况下它可能很有用,尽管我建议手动处理所有取消。

关于python - 如何在 Python 的 `asyncio.gather` 中正确处理取消的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59655245/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com