gpt4 book ai didi

python - 在主进程中异步等待多处理队列

转载 作者:行者123 更新时间:2023-12-04 20:31:34 25 4
gpt4 key购买 nike

我有以下场景:多个工作进程将有关其当前状态的事件发送到事件调度程序。如果我们在主进程中,那么这个事件调度器需要处理所有事件,或者如果我们在工作进程中,则向主进程的事件调度器发出信号来处理这些事件。

这里的主要症结是事件处理也必须在主进程的主线程中,所以我不能只在线程内运行 while True 循环并等待来自那里的工作进程的消息。

所以我所拥有的是:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import current_process, Process, Queue
from threading import current_thread
from time import sleep

def get_q(q):
print("Waiting for the queue ({} / {})\n".format(current_thread().name, current_process().name))
return q.get()

async def message_q(q):
while True:
f = loop.run_in_executor(None, get_q, q)

await f

if f.result() is None:
print("Done")
return;

print("Got the result ({} / {})".format(current_thread().name, current_process().name))
print("Result is: {}\n".format(f.result()))

async def something_else():
while True:
print("Something else\n")
await asyncio.sleep(2)

def other_process(q):
for i in range(5):
print("Putting something in the queue ({})".format(current_process().name))
q.put(i)
sleep(1)

q.put(None)

q = Queue()

Process(target=other_process, args=(q,), daemon=True).start()

loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=1))
asyncio.ensure_future(message_q(q))
asyncio.ensure_future(something_else())
loop.run_until_complete(asyncio.sleep(6))
other_process()是一个示例性的工作进程,它使用队列向主进程发出信号,主进程运行事件循环来处理内容并等待队列上的任何数据。在实际情况下,这个进程会向事件调度程序发出信号,后者将处理队列消息,将消息传递给主进程事件调度程序,但在这里我简化了一点。

但是,我对此并不十分满意。提交 get_q()一次又一次地来了 ThreadPoolExecutor产生更多的开销,并且不像一个长时间运行的线程那么干净。还有 await f一旦队列中没有更多数据,就没有最佳和阻塞,这会阻止事件循环退出。我的解决方法是发送 None worker 完成并退出后 message_q()如果 None在队列中。

有没有更好的方法来实现这一点?性能非常重要,我希望将 Queue 对象保留在事件调度程序的本地,而不是将其传递给管理工作进程的代码(或需要调用其某种 finalize() 方法)。

最佳答案

我现在将其实现为异步上下文管理器。上下文管理器调用

asyncio.ensure_future(message_q())

在其 __aenter__()方法并添加 None到队列中的 __aexit__()message_q()中关闭无限循环的方法.

然后可以在 async with 中使用上下文管理器。围绕进程生成代码部分的语句,无需手动调用关闭方法。但是,建议调用 await asyncio.sleep(0)__aenter__()确保 message_q()后的方法协程允许上下文管理器初始化队列监听器。否则, message_q()不会立即被调用。这本身不是问题(因为无论如何队列已满),但它会将事件转发延迟到下一个 await发生在代码中。

这些进程应该使用 ProcessPoolExecutor 产生。连同 loop.run_in_executor() ,所以等待进程不会阻塞事件循环。

除了使用 Queue,您可能还想使用 JoinableQueue 来确保在退出上下文管理器之前所有事件都已处理。

关于python - 在主进程中异步等待多处理队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44853757/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com