gpt4 book ai didi

python - 在两个不同的线程中运行 asyncio 以即时处理操作

转载 作者:太空宇宙 更新时间:2023-11-04 05:47:27 25 4
gpt4 key购买 nike

我正在尝试异步处理许多操作:我想将操作发送到我的循环并同时在 ProcessPoolExecutor 中运行它们。我想我不知道我将在开始时运行的所有作业,所以我无法定义所有作业然后然后启动事件循环。

我找到的唯一解决方案是运行一个可以处理操作的主线程,以及另一个执行 loop.run_forever 的线程,这似乎可行。但是,我没有看到任何两个单独的线程以这种方式在同一个循环上运行的例子。有没有其他方法可以解决这个问题,如果我的解决方案会出现什么问题?

import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
import threading


executor = ProcessPoolExecutor(max_workers=3)


def do_work(eventloop, value):
future = eventloop.run_in_executor(executor, functools.partial(process_action, value))
future.add_done_callback(run_job_success)


def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value


def run_job_success(f):
print("Success : %s" % f.result())


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop_thread = threading.Thread(target=loop.run_forever)
loop_thread.start()
while True:
msg = recv()
if msg is not None:
do_work(loop, msg)

编辑:我使用 recv 方法获取要接收的作业。

最佳答案

您尝试做的事情有点模棱两可——您说您不知道要在程序开始时运行的所有作业。这很好 - 但是您如何找到您想要运行的作业?在任何情况下,您上面的测试程序都可以(并且应该)通过使用 BaseEventLoop.call_soon 来重写为单线程,以安排您想要的所有 do_work 调用在开始事件循环之前制作:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time

def do_work(eventloop, value):
future = eventloop.run_in_executor(executor, functools.partial(process_action, value))
future.add_done_callback(run_job_success)

def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value

def run_job_success(f):
print("Success : %s" % f.result())

if __name__ == "__main__":
executor = ProcessPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
for i in range(5):
loop.call_soon(do_work, loop, i)
loop.run_forever()

或者可以进一步重构为使用协程而不是回调,这通常是使用 asyncio 时的首选样式:

import time
import asyncio
import functools
from concurrent.futures import ProcessPoolExecutor

def do_work(loop, value):
return loop.run_in_executor(executor, functools.partial(process_action, value))

def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value

@asyncio.coroutine
def main(loop):
tasks = [do_work(loop, i) for i in range(5)]
for fut in asyncio.as_completed(tasks):
result = yield from fut
print("Success : %s" % result)

if __name__ == "__main__":
executor = ProcessPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

这也使得在完成所有工作后更容易退出程序,而不必使用 Ctrl+C 和 loop.run_forever

您当前的方法是安全的(loop.run_in_executor 在幕后使用 loop.call_soon_threadsafe,这是您可以在事件循环中安全/正确安排工作的唯一方法从一个单独的线程),它只是过于复杂和不必要的; asyncio 的设计使得使用它的程序是单线程的(除非需要运行阻塞操作,这就是 run_in_executor 的用途)。

关于python - 在两个不同的线程中运行 asyncio 以即时处理操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31613422/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com