gpt4 book ai didi

python - asyncio.run_coroutine_threadsafe 的 future 永远挂起?

转载 作者:行者123 更新时间:2023-12-01 00:42:43 42 4
gpt4 key购买 nike

作为我的 previous question about calling an async function from a synchronous one 的后续行动,我发现了asyncio.run_coroutine_threadsafe

从纸面上看,这看起来很理想。基于this StackOverflow question中的评论,这看起来很理想。我可以创建一个新线程,获取对原始事件循环的引用,并安排异步函数在原始事件循环内运行,同时仅阻塞新线程。

class _AsyncBridge:
def call_async_method(self, function, *args, **kwargs):
print(f"call_async_method {threading.get_ident()}")
event_loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor()
return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()

async def _async_wrapper(self, event_loop, function, *args, **kwargs):
print(f"async_wrapper {threading.get_ident()}")
future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
return future.result()

这不会出错,但也不会返回。 future 只是挂起,异步调用永远不会被命中。我是否在 call_async_method_async_wrapper 或两者中使用 Future 似乎并不重要;无论我在哪里使用 Future,它都会挂起。

我尝试将 run_coroutine_threadsafe 调用直接放入我的主事件循环中:

event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()

在这里, future 也悬而未决。

我尝试使用定义的LoopExecutorhere ,这似乎是我需求的准确答案。

event_loop = asyncio.get_event_loop()
loop_executor = LoopExecutor(event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()

返回的 Future 也挂起。

我考虑过我阻塞了原来的事件循环,因此计划的任务永远不会运行,所以我创建了一个新的事件循环:

event_loop = asyncio.get_event_loop()
new_event_loop = asyncio.new_event_loop()
print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
loop_executor = LoopExecutor(new_event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
return return_value

仍然卡在future.result()处,我不明白为什么。

asyncio.run_coroutine_threadsafe/我使用它的方式有什么问题吗?

最佳答案

我认为有两个问题。第一个是 run_coroutine_threadsafe 仅提交协程,但并未真正运行它。

所以

event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()

不起作用,因为您从未运行过此循环。

为了让它工作,理论上你可以使用asyncio.run(future),但实际上,你不能,也许是因为它是由run_coroutine_threadsafe提交的>。以下内容将起作用:

import asyncio

async def stop():
await asyncio.sleep(3)

event_loop = asyncio.get_event_loop()
coro = asyncio.sleep(1, result=3)
future = asyncio.run_coroutine_threadsafe(coro, event_loop)
event_loop.run_until_complete(stop())
print(future.result())
<小时/>

第二个问题是,我想你已经注意到你的结构在某种程度上被颠倒了。您应该在单独的线程中运行事件循环,但从主线程提交任务。如果你在单独的线程中提交它,你仍然需要在主线程中运行事件循环来实际执行它。大多数情况下,我建议在单独的线程中创建另一个事件循环。

关于python - asyncio.run_coroutine_threadsafe 的 future 永远挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57238316/

42 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com