gpt4 book ai didi

python - 多处理池和队列

转载 作者:太空宇宙 更新时间:2023-11-04 08:51:30 25 4
gpt4 key购买 nike

我正在使用带池的多处理。我需要将结构作为参数传递给必须在单独进程中使用的函数。我遇到了 multiprocessing.Pool 的映射函数问题,因为我无法复制 Pool.QueuePool.Array。该结构将用于动态记录每个终止进程的结果。这是我的代码:

import multiprocessing
from multiprocessing import Process, Manager, Queue, Array
import itertools
import time

def do_work(number, out_queue=None):
if out_queue is not None:
print "Treated nb ", number
out_queue.append("Treated nb " + str(number))
return 0


def multi_run_wrapper(iter_values):
return do_work(*iter_values)

def test_pool():
# Get the max cpu
nb_proc = multiprocessing.cpu_count()

pool = multiprocessing.Pool(processes=nb_proc)
total_tasks = 16
tasks = range(total_tasks)

out_queue= Queue() # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function.
out_array = Array('i', total_tasks)
iter_values = itertools.izip(tasks, itertools.repeat(out_array))
results = pool.map_async(multi_run_wrapper, iter_values)

pool.close()
pool.join()
print results._value
while not out_queue.empty():
print "queue: ", out_queue.get()
print "out array: \n", out_array

if __name__ == "__main__":
test_pool()

我需要在分离进程中启动一个工作程序并将我的输出队列作为参数传递。我还想指定包含有限数量运行进程的池。为此,我使用了 pool.map_async() 函数。不幸的是,上面的代码给了我一个错误:

Exception in thread Thread-2:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
assert_spawning(self)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

我相信这是因为 Queue 永远无法复制,正如我在文档中所读的那样。然后我想到让队列成为一个全局变量,这样我就不需要再传递它了,但在我看来那太乱了。我还考虑过使用 multiprocessing.Array

out_array = Array('i', total_tasks)

但是会出现与队列相同的错误:

# ...
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

我需要使用此功能 - 使用多处理和从子进程交换信息 -在一个相对较大的软件中,所以我希望我的代码保持干净整洁。

如何以优雅的方式将队列传递给我的工作人员?

当然,欢迎使用任何其他方式处理主要规范。

最佳答案

multiprocessing.Pool 不会接受 multiprocessing.Queue 作为其工作队列中的参数。我相信这是因为它在内部使用队列将数据来回发送到工作进程。有几个解决方法:

1) 你真的需要使用队列吗? Pool 函数的一个优点是它们的返回值被发送回主进程。迭代池中的返回值通常比使用单独的队列更好。这也避免了通过检查 queue.empty()

引入的竞争条件

2) 如果您必须使用Queue,您可以使用multiprocessing.Manager 中的队列。这是共享队列的代理,可以作为参数传递给 Pool 函数。

3) 您可以使用 initializer 将普通的 Queue 传递给工作进程创建 Pool 时(如 https://stackoverflow.com/a/3843313 )。这有点骇人听闻。

我上面提到的竞争条件来自:

while not out_queue.empty():
print "queue: ", out_queue.get()

当您有工作进程填充您的队列时,您可能会遇到队列当前为空的情况,因为工作进程正要向其中放入一些东西。如果你此时勾选.empty()你会提前结束。更好的方法是将sentinal 值放入您的队列中,以便在您完成将数据放入其中时发出信号。

关于python - 多处理池和队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34771953/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com