gpt4 book ai didi

python - 告诉多个生产者的消费者没有更多结果

转载 作者:行者123 更新时间:2023-12-01 05:19:08 25 4
gpt4 key购买 nike

我有一些代码将工作添加到一个队列中,由多个工作人员处理,然后将结果放入另一个队列中,由最终工作人员处理。当我有多个生产者向此结果队列添加 Material 时,我如何可靠地向 collector 进程发出信号,表明没有更多内容需要处理?

import multiprocessing
import time

J = multiprocessing.Queue()
R = multiprocessing.Queue()

def intermediate_worker(jobs, results):
while True:
task = jobs.get()
if task is None:
jobs.put(None)
break
print 'working', task
results.put(task)

def collector(result_queue) :
total = 0
while True :
result = result_queue.get()
total += result
print 'collection', total


[multiprocessing.Process(target=intermediate_worker, args=(J,R)).start()
for i in xrange(2)]

multiprocessing.Process(target=collector, args=(R,)).start()

for chunk_dummy in xrange(10) :
J.put(chunk)
J.put(None)

最佳答案

有几种方法。一种是寻找n个毒丸,其中n是你拥有的生产者数量。您已经熟悉这种方法,因为您正在使用它来关闭生产者。这工作正常,但有点笨重 - 您的消费者需要额外的逻辑来跟踪您已经看到了多少毒丸。

我个人最喜欢的是使用 Semaphore帮我数数。如果我们知道有多少生产者还活着,那么就有足够的信息来确定何时关闭消费者 - 我们在(没有生产者活着)(队列为空)时关闭它)

J = multiprocessing.Queue()
R = multiprocessing.Queue()
S = multiprocessing.Semaphore(NUMWORKERS)

def intermediate_worker(jobs, results, sem):
with sem: #context manager handles incrementing/decrementing the semaphore
while True:
task = jobs.get()
if task is None:
jobs.put(None)
break
print 'working', task
results.put(task)

def collector(result_queue, sem) :
total = 0
while sem.get_value() < NUMWORKERS or not result_queue.empty():
result = result_queue.get()
total += result
print 'collection', total

[multiprocessing.Process(target=intermediate_worker, args=(J,R,S)).start()
for i in xrange(NUMWORKERS)]

multiprocessing.Process(target=collector, args=(R,S)).start()

代码很粗糙,完全未经测试,但应该能表达要点。

关于python - 告诉多个生产者的消费者没有更多结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22716657/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com