gpt4 book ai didi

python - 异步: terminating all tasks when one of them throws exception

转载 作者:太空宇宙 更新时间:2023-11-03 20:28:29 24 4
gpt4 key购买 nike

我正在编写 python 上下文管理器,它运行异步任务。如果我的经理的任何任务抛出异常,我希望我的经理终止。这是示例代码:

class MyClass:
def __init__(self):
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())

self.loop = asyncio.get_event_loop()

def __enter__(self):
return self

def __exit__(self, excType, excValue, tb):
try:
self.loop.run_until_complete(self._exit_loop())
finally:
self.loop.close()

if excType is not None:
print(excType.__name__, ':', excValue)
traceback.print_tb(tb)

async def _exit_loop(self):
tasks = [task for task in asyncio.all_tasks(self.loop) if
task is not asyncio.current_task(self.loop)]
list(map(lambda task: task.cancel(), tasks))

results = await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()


async def func1(self):
while True:
print('func1')
await asyncio.sleep(1)

async def func2(self):
i = 5
while i > 0:
print('func2')
await asyncio.sleep(1)
i -= 1
raise Exception

async def _async_start(self):
self.loop.create_task(self.func1())
self.loop.create_task(self.func2())

def start(self):
self.loop.run_until_complete(self._async_start())

with MyClass() as myClass:
myClass.start()
myClass.loop.run_forever()

这是该脚本的输出:

func1
func2
func1
func2
func1
func2
func1
func2
func1
func2
Task exception was never retrieved
func1
future: <Task finished coro=<MyClass.func2() done, defined at /home/framal/Programy/schnapps/schnapps/bottle/client.py:381> exception=Exception()>
Traceback (most recent call last):
File "/home/framal/Programy/schnapps/schnapps/bottle/client.py", line 387, in func2
raise Exception
Exception
func1
func1
func1
.
.
.

我尝试使用自定义异常处理程序,但没有任何效果 - 它们在强制终止进程后立即开始运行。

如何将异常传递给循环,以便它关闭所有其他任务?

最佳答案

我不明白你为什么要这样使用上下文管理器(CM)。也许有更好的方法。

无论如何,如果给定了 CM 并且您将 loop.run_forever() 放入 with block ,这是我知道在这种情况下退出循环的唯一方法,所以控制权传递给 CM 的退出函数是 loop.stop()

这是一个小装饰器,处理除使用 loop.stop() 取消之外的所有异常。

def watchdog(afunc):
@functools.wraps(afunc)
async def run(*args, **kwargs):
try:
await afunc(*args, **kwargs)
except asyncio.CancelledError:
return
except Exception as err:
print(f"exception {err}")
asyncio.get_event_loop().stop()
return run

如果您装饰由 CM 作为任务启动的所有协程(func1func2),例如:

@watchdog
async def func2(self):

然后它将在第一次异常后停止。

关于python - 异步: terminating all tasks when one of them throws exception,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57660884/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com