gpt4 book ai didi

python - 当增加迭代次数时,多进程会变成僵尸进程。 mp.Queue() 与 Manager.list() 相比有何优势?

转载 作者:太空宇宙 更新时间:2023-11-03 16:15:38 24 4
gpt4 key购买 nike

我正在尝试使用多重处理来生成 4 个进程,这些进程会强制进行一些计算,并且每个进程在每次迭代中操作我想要在它们之间共享的单个列表对象的机会非常小。不是best practice在指南中,但我需要“很多人”。

对于相对较少的迭代次数,代码可以正常工作,但是当迭代次数增加到一定阈值时,所有四个进程都将进入僵尸状态。他们默默地失败了。

我尝试使用 multiprocessing.Queue() 跟踪对共享列表的修改。它出现于this SO post , this closed Python issue – "not a bug" ,以及一些提到这些的帖子,底层管道可能会过载并且进程会挂起。 SO 帖子中接受的答案非常难以破译,因为有太多多余的代码。

为清楚起见进行了编辑:
文档中的示例执行非常轻量级的操作,几乎总是单个函数调用。因此,我不知道我是否误解和滥用了功能。

指南说:

It is probably best to stick to using queues or pipes for communication between processes rather than using the lower level synchronization primitives from the threading module.

这里的“沟通”是否意味着我在示例中实际所做的事情之外的其他事情?
或者
这是否意味着我应该在队列中共享 my_list 而不是与经理共享?这不是意味着每个进程的每次迭代都会使用 queue.getqueue.put 吗?

If maxsize is less than or equal to zero, the queue size is infinite.

这样做并不能修复我的失败示例中的错误。在我执行 queue.put() 之前,所有数据都存储在普通的 Python 列表中:my_return_list 那么这实际上是由于我提供的链接而失败吗?

与我当前的解决方法相比,是否有更好的方法?我似乎找不到其他人采取类似的方法,我觉得我错过了一些东西。我需要它同时适用于 Windows 和 Linux。

失败示例(取决于 __main__ 下的迭代):

import multiprocessing as mp
import random
import sys

def mutate_list(my_list, proc_num, iterations, queue, lock):
my_return_list = []

if iterations < 1001:
# Works fine
for x in xrange(iterations):
if random.random() < 0.01:
lock.acquire()

print "Process {} changed list from:".format(proc_num)
print my_list
print "to"
random.shuffle(my_list)
print my_list
print "........"
sys.stdout.flush()

lock.release()

my_return_list.append([x, list(my_list)])
else:
for x in xrange(iterations):
# Enters zombie state
if random.random() < 0.01:
lock.acquire()
random.shuffle(my_list)
my_return_list.append([x, list(my_list)])
lock.release()
if x % 1000 == 0:
print "Completed iterations:", x
sys.stdout.flush()

queue.put(my_return_list)


def multi_proc_handler(iterations):

manager = mp.Manager()
ns = manager.list()
ns.extend([x for x in range(10)])
queue = mp.Queue()
lock = manager.Lock()

print "Starting list to share", ns
print ns
sys.stdout.flush()

p = [mp.Process(target=mutate_list, args=(ns,x,iterations,queue,lock)) for x in range(4)]

for process in p: process.start()
for process in p: process.join()

output = [queue.get() for process in p]
return output

if __name__ == '__main__':
# 1000 iterations is fine, 100000 iterations will create zombies
multi_caller = multi_proc_handler(100000)

使用multiprocessing.Manager.list()的解决方法:

import multiprocessing as mp
import random
import sys

def mutate_list(my_list, proc_num, iterations, my_final_list, lock):

for x in xrange(iterations):
if random.random() < 0.01:
lock.acquire()
random.shuffle(my_list)
my_final_list.append([x, list(my_list)])
lock.release()
if x % 10000 == 0:
print "Completed iterations:", x
sys.stdout.flush()


def multi_proc_handler(iterations):

manager = mp.Manager()
ns = manager.list([x for x in range(10)])
lock = manager.Lock()

my_final_list = manager.list() # My Queue substitute

print "Starting list to share", ns
print ns
sys.stdout.flush()

p = [mp.Process(target=mutate_list, args=(ns,x,iterations,my_final_list,
lock)) for x in range(4)]

for process in p: process.start()
for process in p: process.join()

return list(my_final_list)

if __name__ == '__main__':

multi_caller = multi_proc_handler(100000)

最佳答案

队列与列表

在底层,multiprocessing.Queuemanager.list() 都在向缓冲区写入和读取。

队列

shared_queue = multiprocessing.Queue()

当您使用 N 或更多字节(其中 N 取决于许多变量)调用 put 时,它超出了缓冲区可以处理的范围,并且 put 会阻塞。您也许可以通过在另一个进程中调用 get 来取消阻止 put。这是一个使用代码的第一个版本应该很容易执行的实验。我强烈建议您尝试这个实验。

列表

manager = multiprocessing.Manager()
shared_list = manager.list()

当您调用append时,您传递的字节数远少于N个字节,并且写入缓冲区成功。 还有另一个进程从缓冲区读取数据并将其附加到实际的列表。该流程由经理创建。即使您使用 N 或更多字节调用 append,一切都应该继续工作,因为还有另一个进程正在从缓冲区读取。您可以通过这种方式将任意数量的字节传递给另一个进程。

摘要

希望这能澄清为什么您的“解决方法”有效。您将对缓冲区的写入分解为更小的片段,并且有一个辅助进程从缓冲区读取数据,将这些片段放入托管列表中。

关于python - 当增加迭代次数时,多进程会变成僵尸进程。 mp.Queue() 与 Manager.list() 相比有何优势?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38961584/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com