gpt4 book ai didi

python - 从另一个线程调度异步协程,无需大量回调和同步等待

转载 作者:太空狗 更新时间:2023-10-30 02:56:09 24 4
gpt4 key购买 nike

我必须要求澄清 this问题。

我有一个发送消息的协程 send。我想在 loop2(在线程 2 中运行)的 loop1(在线程 1 中运行)中安排它:

async def send_threadsafe(self, message, current_loop=loop2, dest_loop=loop1):
future = asyncio.run_coroutine_threadsafe(
send(message), loop=dest_loop
)

asyncio.run_coroutine_threadsafe 返回的future 是一个concurrent.futures.Future,不能异步等待。

所以问题是:我如何正确地等待 future 和/或我应该如何安排我的 send 以获取可等待的对象?

我知道我能做到:

async def send_threadsafe(...):
future = ...
result = await current_loop.run_in_executor(None, future.result)

但是有没有不使用另一个线程的方法呢?因为 run_in_executor 会将 future.result 发送到线程池,而我不想使用该线程池。

我不想使用 call_soon_threadsafe 的原因是它需要创建多个回调。首先,安排在 loop1 中运行 send。其次,在 loop1 中运行 send 并在 loop2 中安排第三次回调。第三,将结果设置为在第一个回调中创建的 future(因为 asyncio futures 不是线程安全的,我无法设置 loop1 的结果)。

最佳答案

您可以使用 asyncio.wrap_future 从并发 future 获取异步 future :

async def send_threadsafe(self, message, destination, *, loop=loop):
concurrent = asyncio.run_coroutine_threadsafe(
send(message), loop=destination)
return await asyncio.wrap_future(concurrent, loop=loop)

可以通过实现异步执行器来实现同样的事情:

from concurrent.futures import Executor

class AsyncioExecutor(Executor):

def __init__(self, loop):
self.loop = loop

def submit(self, fn, *args, **kwargs):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self.loop)

例子:

executor = AsyncioExecutor(remote_loop)
result = await loop.run_in_executor(executor, send, message)

关于python - 从另一个线程调度异步协程,无需大量回调和同步等待,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40341608/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com