gpt4 book ai didi

python - 您如何等待另一个线程提交的回调完成?

转载 作者:太空宇宙 更新时间:2023-11-03 14:41:10 37 4
gpt4 key购买 nike

我有两个共享某些状态的 Python 线程,AB。有一次,A 提交了一个回调,由 B 在其循环中运行,例如:

# This line is executed by A
loop.call_soon_threadsafe(callback)

在此之后我想继续做其他事情,但我想确保 callback 已被 B 运行,然后再这样做。有没有什么办法(除了标准的线程同步原语)让 A 等待回调完成?我知道call_soon_threadsafe 返回一个asyncio.Handle 对象可以取消任务,但我不确定这是否可以用于等待(我仍然不太了解asyncio).

在这种情况下,此回调调用 loop.close() 并取消剩余的任务,之后,在 B 中,在 loop.run_forever( ) 有一个 loop.close()。因此,对于这个用例,特别是一个线程安全机制,它允许我从 A 知道循环何时有效关闭,这对我来说也适用——同样,不涉及互斥锁/条件变量/等.

我知道 asyncio 不是线程安全的,除了极少数异常(exception),但我想知道是否提供了一种方便的方法来实现这一点。


这里是我的意思的一个非常小的片段,以防它有帮助。

import asyncio
import threading
import time

def thread_A():
print('Thread A')
loop = asyncio.new_event_loop()
threading.Thread(target=thread_B, args=(loop,)).start()
time.sleep(1)
handle = loop.call_soon_threadsafe(callback, loop)
# How do I wait for the callback to complete before continuing?
print('Thread A out')

def thread_B(loop):
print('Thread B')
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print('Thread B out')

def callback(loop):
print('Stopping loop')
loop.stop()

thread_A()

我用 asyncio.run_coroutine_threadsafe 试过这个变体但它不起作用,而是线程 A 永远挂起。不确定是我做错了什么还是因为我正在停止循环。

import asyncio
import threading
import time

def thread_A():
global future
print('Thread A')
loop = asyncio.new_event_loop()
threading.Thread(target=thread_B, args=(loop,)).start()
time.sleep(1)
future = asyncio.run_coroutine_threadsafe(callback(loop), loop)
future.result() # Hangs here
print('Thread A out')

def thread_B(loop):
print('Thread B')
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print('Thread B out')

async def callback(loop):
print('Stopping loop')
loop.stop()

thread_A()

最佳答案

回调已设置并且(大部分)忘记了。它们不打算用于您需要从中获得结果的事情。这就是为什么生成的句柄只允许您取消回调(不再需要此回调),仅此而已。

如果您需要在另一个线程中等待来自异步管理协程的结果,请使用协程并将其安排为任务 asyncio.run_coroutine_threadsafe() ;这给你一个 Future() instance ,然后您可以等待完成。

但是,使用 run_coroutine_threadsafe() 停止循环确实需要循环处理比实际能够运行的回调多一轮; run_coroutine_threadsafe() 返回的 Future 将不会收到其调度任务状态更改的通知。您可以通过在关闭循环之前在线程 B 中通过 loop.run_until_complete() 运行 asyncio.sleep(0) 来解决此问题:

def thread_A():
# ...
# when done, schedule the asyncio loop to exit
future = asyncio.run_coroutine_threadsafe(shutdown_loop(loop), loop)
future.result() # wait for the shutdown to complete
print("Thread A out")

def thread_B(loop):
print("Thread B")
asyncio.set_event_loop(loop)
loop.run_forever()
# run one last noop task in the loop to clear remaining callbacks
loop.run_until_complete(asyncio.sleep(0))
loop.close()
print("Thread B out")

async def shutdown_loop(loop):
print("Stopping loop")
loop.stop()

当然,这有点 hacky,取决于回调管理和跨线程任务调度的内部结构是否不变。作为默认的 asyncio 实现,运行单个 noop 任务足以进行几轮回调,从而创建更多正在处理的回调,但替代循环实现可能会以不同方式处理此问题。

因此,对于关闭循环,您最好使用基于线程的协调:

def thread_A():
# ...
callback_event = threading.Event()
loop.call_soon_threadsafe(callback, loop, callback_event)
callback_event.wait() # wait for the shutdown to complete
print("Thread A out")

def thread_B(loop):
print("Thread B")
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print("Thread B out")

def callback(loop, callback_event):
print("Stopping loop")
loop.stop()
callback_event.set()

关于python - 您如何等待另一个线程提交的回调完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53107032/

37 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com