gpt4 book ai didi

python - 多个异步上下文管理器

转载 作者:太空宇宙 更新时间:2023-11-03 13:27:15 24 4
gpt4 key购买 nike

是否可以在 python 中组合异步上下文管理器?类似于 asyncio.gather,但可以与上下文管理器一起使用。像这样:

async def foo():
async with asyncio.gather_cm(start_vm(), start_vm()) as vm1, vm2:
await vm1.do_something()
await vm2.do_something()

这目前可能吗?

最佳答案

接近 gather_cm 的东西可以用 AsyncExitStack 实现, 在 Python 3.7 中引入:

async def foo():
async with AsyncExitStack() as stack:
vm1, vm2 = await asyncio.gather(
stack.enter_async_context(start_vm()),
stack.enter_async_context(start_vm()))
await vm1.do_something()
await vm2.do_something()

不幸的是,__aexit__ 仍将按顺序运行。这是因为 AsyncExitStack 模拟嵌套的上下文管理器,它们具有明确定义的顺序并且不能重叠。外部上下文管理器的 __aexit__ 获得有关内部上下文管理器是否引发异常的信息。 (数据库句柄的 __aexit__ 可能会在出现异常时使用它来回滚事务,否则会提交它。)并行运行 __aexit__ 会使上下文管理器重叠,并且异常信息不可用或不可靠。因此,虽然 gather(...) 并行运行 __aenter__,但 AsyncExitStack 记录哪个先出现并运行 __aexit__以相反的顺序。

使用异步上下文管理器,像 gather_cm 这样的替代方案将非常有意义。可以放弃嵌套语义并提供一个像“退出池”而不是堆栈一样工作的聚合上下文管理器。导出池采用多个相互独立的上下文管理器,这允许它们的 __aenter____aexit__ 方法并行运行。

棘手的部分是正确处理异常:如果任何 __aenter__ 引发,则必须传播异常以防止运行 with block 。为确保正确性,池必须保证 __aexit__ 将在所有 __aenter__ 已完成的上下文管理器上调用。

这是一个示例实现:

import asyncio
import sys

class gather_cm:
def __init__(self, *cms):
self._cms = cms

async def __aenter__(self):
futs = [asyncio.create_task(cm.__aenter__())
for cm in self._cms]
await asyncio.wait(futs)
# only exit the cms we've successfully entered
self._cms = [cm for cm, fut in zip(self._cms, futs)
if not fut.cancelled() and not fut.exception()]
try:
return tuple(fut.result() for fut in futs)
except:
await self._exit(*sys.exc_info())
raise

async def _exit(self, *args):
# don't use gather() to ensure that we wait for all __aexit__s
# to complete even if one of them raises
done, _pending = await asyncio.wait(
[cm.__aexit__(*args)
for cm in self._cms if cm is not None])
return all(suppress.result() for suppress in done)

async def __aexit__(self, *args):
# Since exits are running in parallel, so they can't see each
# other exceptions. Send exception info from `async with`
# body to all.
return await self._exit(*args)

这个测试程序展示了它是如何工作的:

class test_cm:
def __init__(self, x):
self.x = x
async def __aenter__(self):
print('__aenter__', self.x)
return self.x
async def __aexit__(self, *args):
print('__aexit__', self.x, args)

async def foo():
async with gather_cm(test_cm('foo'), test_cm('bar')) as (cm1, cm2):
print('cm1', cm1)
print('cm2', cm2)

asyncio.run(foo())

关于python - 多个异步上下文管理器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53050553/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com