gpt4 book ai didi

python - 当所有进程都尝试从队列中获取数据并且队列为空时结束处理吗?

转载 作者:太空宇宙 更新时间:2023-11-03 14:38:22 25 4
gpt4 key购买 nike

我想设置一些接受输入并处理它的进程,这个结果是我想要处理的另一个任务。本质上,每个任务都会产生零个或多个新任务(同一类型),最终所有任务都不会产生新任务。

我认为队列对此很有帮助,因此我有一个输入队列和一个结果队列来添加不会产生任何新内容的任务。在任何时候,队列都可能是空的,但如果另一个进程正在处理某项任务,则可以添加更多队列。

因此,我只希望它在所有进程同时尝试从输入队列中获取数据时结束。

我对 python 多处理和一般的多处理都是全新的。

编辑添加我的意思的基本概述:

class Consumer(Process):
def __init__(self, name):
super().__init__(name=name)

def run():
# This is where I would have the task try to get a new task off of the
# queue and then calculate the results and put them into the queue
# After which it would then try to get a new task and repeat

# If this an all other processes are trying to get and the queue is
# empty That is the only time I know that everything is complete and can
# continue
pass

def start_processing():
in_queue = Queue()
results_queue = Queue()
consumers = [Consumer(str(i)) for i in range(cpu_count())]

for i in consumers:
i.start()

# Wait for the above mentioned conditions to be true before continuing

最佳答案

JoinableQueue就是为了满足这个目的而设计的。加入 JoinableQueue 将阻塞,直到有任务正在进行。

您可以按如下方式使用它:主进程将生成一定数量的工作进程,并为它们分配JoinableQueue。工作进程将使用队列来生成和消耗新任务。主进程将加入队列等待,直到没有更多任务正在进行。之后,它将终止工作进程并退出。

一个非常简化的示例(伪代码):

def consumer(queue):
for task in queue.get():
results = process_task(task)

if 'more_tasks' in results:
for new_task in results['more_tasks']:
queue.put(new_task)

# signal the queue that a task has been completed
queue.task_done()

def main():
queue = JoinableQueue()

processes = start_processes(consumer, queue)

for task in initial_tasks:
queue.put(task)

queue.join() # block until all work is done

terminate_processes(processes)

关于python - 当所有进程都尝试从队列中获取数据并且队列为空时结束处理吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46743939/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com