gpt4 book ai didi

Python协程: Release context manager when pausing

转载 作者:太空狗 更新时间:2023-10-29 21:06:20 25 4
gpt4 key购买 nike

背景:我是一位经验丰富的 Python 程序员,对新的协程/异步/等待功能一无所知。我不能写一个异步的“hello world”来挽救我的生命。

我的问题是:我得到了一个任意协程函数 f。我想编写一个协程函数 g 来包装 f,即我将把 g 给用户,就好像它是 f ,并且用户会调用它并且不会变得更聪明,因为 g 将在后台使用 f。就像装饰普通 Python 函数以添加功能一样。

我想添加的功能:每当程序流进入我的协程时,它就会获取我提供的上下文管理器,一旦程序流离开协程,它就会释放该上下文管理器。流量回来了?重新获取上下文管理器。它又出来了?重新发布它。直到协程完全结束。

为了演示,这里描述了普通生成器的功能:

def generator_wrapper(_, *args, **kwargs):
gen = function(*args, **kwargs)
method, incoming = gen.send, None
while True:
with self:
outgoing = method(incoming)
try:
method, incoming = gen.send, (yield outgoing)
except Exception as e:
method, incoming = gen.throw, e

是否可以使用协程来实现?

最佳答案

协程建立在迭代器之上 - __await__ special method是一个常规迭代器。这允许您将底层迭代器包装在另一个迭代器中。诀窍是您必须使用目标的__await__解包目标的迭代器,然后使用您自己的重新包装您自己的迭代器__await__

作用于实例化协程的核心功能如下所示:

class CoroWrapper:
"""Wrap ``target`` to have every send issued in a ``context``"""
def __init__(self, target: 'Coroutine', context: 'ContextManager'):
self.target = target
self.context = context

# wrap an iterator for use with 'await'
def __await__(self):
# unwrap the underlying iterator
target_iter = self.target.__await__()
# emulate 'yield from'
iter_send, iter_throw = target_iter.send, target_iter.throw
send, message = iter_send, None
while True:
# communicate with the target coroutine
try:
with self.context:
signal = send(message)
except StopIteration as err:
return err.value
else:
send = iter_send
# communicate with the ambient event loop
try:
message = yield signal
except BaseException as err:
send, message = iter_throw, err

请注意,这明确适用于 Coroutine,而不是 Awaitable - Coroutine.__await__ 实现生成器接口(interface)。理论上,Awaitable 不一定提供 __await__().send__await__().throw

这足以传入和传出消息:

import asyncio


class PrintContext:
def __enter__(self):
print('enter')

def __exit__(self, exc_type, exc_val, exc_tb):
print('exit via', exc_type)
return False


async def main_coro():
print(
'wrapper returned',
await CoroWrapper(test_coro(), PrintContext())
)


async def test_coro(delay=0.5):
await asyncio.sleep(delay)
return 2

asyncio.run(main_coro())
# enter
# exit via None
# enter
# exit <class 'StopIteration'>
# wrapper returned 2

您可以将包装部分委托(delegate)给单独的装饰器。这也确保您有一个实际的协程,而不是自定义类 - 一些异步库需要这个。

from functools import wraps


def send_context(context: 'ContextManager'):
"""Wrap a coroutine to issue every send in a context"""
def coro_wrapper(target: 'Callable[..., Coroutine]') -> 'Callable[..., Coroutine]':
@wraps(target)
async def context_coroutine(*args, **kwargs):
return await CoroWrapper(target(*args, **kwargs), context)
return context_coroutine
return coro_wrapper

这允许您直接装饰协程函数:

@send_context(PrintContext())
async def test_coro(delay=0.5):
await asyncio.sleep(delay)
return 2

print('async run returned:', asyncio.run(test_coro()))
# enter
# exit via None
# enter
# exit via <class 'StopIteration'>
# async run returned: 2

关于Python协程: Release context manager when pausing,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56078482/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com