gpt4 book ai didi

python - multiprocessing.Process 实例完成后我的队列为空

转载 作者:太空宇宙 更新时间:2023-11-04 10:31:10 24 4
gpt4 key购买 nike

我在文件顶部有一个 python 脚本:

result_queue = Queue.Queue()
key_list = *a large list of small items* #(actually from bucket.list() via boto)

我了解到队列是进程安全的数据结构。我有一个方法:

def enqueue_tasks(keys):
for key in keys:
try:
result = perform_scan.delay(key)
result_queue.put(result)
except:
print "failed"

这里的perform_scan.delay()函数实际上调用了一个celery worker,但我认为不相关(它是一个异步进程调用)。

我还有:

def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)

最后我有一个 main() 函数:

def main():

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
concurrent.futures.wait(futures)
print len(result_queue)

打印语句的结果是 0。然而,如果我在 enqueue_tasks 中包含一个大小为 result_queue 的打印语句,当程序运行时,我可以看到大小在增加,并且有东西被添加到队列中。

关于正在发生的事情的想法?

最佳答案

看起来这个问题有更简单的解决方案。

您正在构建一个 future 列表。 future 的全部意义在于它们是 future 的结果。特别是,无论每个函数返回什么,都是 future 的(最终)值(value)。所以,根本不要做整个“将结果插入队列”的事​​情,只需从任务函数返回它们,然后从 futures 中取出它们。


做到这一点的最简单方法是打破循环,这样每个键都是一个单独的任务,有一个单独的 future 。我不知道这是否适合您的真实代码,但如果适合:

def do_task(key):
try:
return perform_scan.delay(key)
except:
print "failed"

def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(do_task, key) for key in key_list]
# If you want to do anything with these results, you probably want
# a loop around concurrent.futures.as_completed or similar here,
# rather than waiting for them all to finish, ignoring the results,
# and printing the number of them.
concurrent.futures.wait(futures)
print len(futures)

当然,这不会进行分组。但是你需要它吗?

之所以需要分组,最可能的原因是任务非常小,以至于调度它们(以及对输入和输出进行酸洗)的开销淹没了实际工作。如果那是真的,那么您几乎可以肯定地等到整个批处理完成后再返回任何结果。特别是考虑到你甚至都没有看到结果,直到它们都完成了。 (这种“分成组,处理每个组,合并回来”的模型在数值工作等情况下很常见,其中每个元素可能很小,或者元素可能彼此不独立,但有些组很大足够或独立于其余工作。)

无论如何,这几乎是一样简单:

def do_tasks(keys):
results = []
for key in keys:
try:
result = perform_scan.delay(key)
results.append(result)
except:
print "failed"
return results

def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
print sum(len(results) for results in concurrent.futures.as_completed(futures))

或者,如果您更喜欢先等待再计算:

def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
concurrent.futures.wait(futures)
print sum(len(future.result()) for future in futures)

但我再次怀疑你是否需要这个。

关于python - multiprocessing.Process 实例完成后我的队列为空,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26415825/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com