gpt4 book ai didi

Python 3 : How to submit an async function to a threadPool?

转载 作者:行者123 更新时间:2023-12-01 01:12:03 25 4
gpt4 key购买 nike

我想同时使用 concurrent.futures 中的 ThreadPoolExecutor 和异步函数。

我的程序重复向线程池提交具有不同输入值的函数。在该较大函数中执行的最终任务序列可以按任何顺序,并且我不关心返回值,只关心它们在将来的某个时刻执行。

所以我尝试这样做

async def startLoop():

while 1:
for item in clients:
arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))

wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)

提交的函数是:

async def threadWork(obj):
bool = do_something() # needs to execute before next functions
if bool:
do_a() # can be executed at any time
do_b() # ^

其中 do_bdo_a 是异步函数。问题是我收到错误: TypeError: object Future can't be use in ' wait'表达式,如果我删除await,我会收到另一个错误,提示我需要添加await

我想我可以让一切都使用线程,但我真的不想这样做。

最佳答案

我建议仔细阅读 Python 3 的 asyncio development guide ,特别是“并发和多线程”部分。

示例中的主要概念问题是事件循环是单线程的,因此在线程池中执行异步协程没有意义。事件循环和线程交互有几种方式:

  • 每个线程的事件循环。例如:

     async def threadWorkAsync(obj):
    b = do_something()
    if b:
    # Run a and b as concurrent tasks
    task_a = asyncio.create_task(do_a())
    task_b = asyncio.create_task(do_b())
    await task_a
    await task_b

    def threadWork(obj):
    # Create run loop for this thread and block until completion
    asyncio.run(threadWorkAsync())

    def startLoop():
    while 1:
    arrayOfFutures = []
    for item in clients:
    arrayOfFutures.append(config.threadPool.submit(threadWork, item))

    wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
  • 在执行器中执行阻塞代码。这允许您使用异步 future 而不是上面的并发 future。

     async def startLoop():
    while 1:
    arrayOfFutures = []
    for item in clients:
    arrayOfFutures.append(asyncio.run_in_executor(
    config.threadPool, threadWork, item))

    await asyncio.gather(*arrayOfFutures)
  • 使用线程安全函数跨线程将任务提交到事件循环。例如,您可以在主线程的运行循环中运行所有异步协程,而不是为每个线程创建运行循环:

     def threadWork(obj, loop):
    b = do_something()
    if b:
    future_a = asyncio.run_coroutine_threadsafe(do_a(), loop)
    future_b = asyncio.run_coroutine_threadsafe(do_b(), loop)
    concurrent.futures.wait([future_a, future_b])

    async def startLoop():
    loop = asyncio.get_running_loop()
    while 1:
    arrayOfFutures = []
    for item in clients:
    arrayOfFutures.append(asyncio.run_in_executor(
    config.threadPool, threadWork, item, loop))

    await asyncio.gather(*arrayOfFutures)

    注意:这个示例不应该按字面意思使用,因为它会导致所有协程在主线程中执行,而线程池工作线程只是阻塞。这只是为了展示 run_coroutine_threadsafe() 方法的示例。

关于Python 3 : How to submit an async function to a threadPool?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54758930/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com