- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想同时使用 concurrent.futures
中的 ThreadPoolExecutor
和异步函数。
我的程序重复向线程池提交具有不同输入值的函数。在该较大函数中执行的最终任务序列可以按任何顺序,并且我不关心返回值,只关心它们在将来的某个时刻执行。
所以我尝试这样做
async def startLoop():
while 1:
for item in clients:
arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))
wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
提交的函数是:
async def threadWork(obj):
bool = do_something() # needs to execute before next functions
if bool:
do_a() # can be executed at any time
do_b() # ^
其中 do_b
和 do_a
是异步函数。问题是我收到错误: TypeError: object Future can't be use in ' wait'表达式
,如果我删除await,我会收到另一个错误,提示我需要添加await
。
我想我可以让一切都使用线程,但我真的不想这样做。
最佳答案
我建议仔细阅读 Python 3 的 asyncio development guide ,特别是“并发和多线程”部分。
示例中的主要概念问题是事件循环是单线程的,因此在线程池中执行异步协程没有意义。事件循环和线程交互有几种方式:
每个线程的事件循环。例如:
async def threadWorkAsync(obj):
b = do_something()
if b:
# Run a and b as concurrent tasks
task_a = asyncio.create_task(do_a())
task_b = asyncio.create_task(do_b())
await task_a
await task_b
def threadWork(obj):
# Create run loop for this thread and block until completion
asyncio.run(threadWorkAsync())
def startLoop():
while 1:
arrayOfFutures = []
for item in clients:
arrayOfFutures.append(config.threadPool.submit(threadWork, item))
wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
在执行器中执行阻塞代码。这允许您使用异步 future 而不是上面的并发 future。
async def startLoop():
while 1:
arrayOfFutures = []
for item in clients:
arrayOfFutures.append(asyncio.run_in_executor(
config.threadPool, threadWork, item))
await asyncio.gather(*arrayOfFutures)
使用线程安全函数跨线程将任务提交到事件循环。例如,您可以在主线程的运行循环中运行所有异步协程,而不是为每个线程创建运行循环:
def threadWork(obj, loop):
b = do_something()
if b:
future_a = asyncio.run_coroutine_threadsafe(do_a(), loop)
future_b = asyncio.run_coroutine_threadsafe(do_b(), loop)
concurrent.futures.wait([future_a, future_b])
async def startLoop():
loop = asyncio.get_running_loop()
while 1:
arrayOfFutures = []
for item in clients:
arrayOfFutures.append(asyncio.run_in_executor(
config.threadPool, threadWork, item, loop))
await asyncio.gather(*arrayOfFutures)
注意:这个示例不应该按字面意思使用,因为它会导致所有协程在主线程中执行,而线程池工作线程只是阻塞。这只是为了展示 run_coroutine_threadsafe()
方法的示例。
关于Python 3 : How to submit an async function to a threadPool?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54758930/
我是一名优秀的程序员,十分优秀!