gpt4 book ai didi

Python 池 生成池

转载 作者:行者123 更新时间:2023-12-01 02:35:05 24 4
gpt4 key购买 nike

我有一个函数,它使用 multiprocessing.Pool 并行处理一个数据集中的所有数据。

from multiprocessing import Pool
...
def func():
...
p = Pool(processes=N)
p.map(func, params)
...

但是,我现在想针对 M 个不同的数据集并行运行它。因此,我在上面现有的脚本之上编写了另一个脚本。我再次尝试使用 Pool 来创建 M 个进程(每个进程都会生成一个由 N 进程组成的 Pool)。但是,我收到一个关于守护进程无法生成子进程的错误(这听起来非常摇滚)。因此,我阅读了一些内容,然后将 Pool 换成了不太了解的 multiprocessing.pool.ThreadPool。所以看起来像

p = ThreadPool(processes=M)
p.starmap(func, args)

但是,当我运行此程序时,我发现 ThreadPool 一次仅处理一个数据集。那么我应该使用什么才能拥有一个生成 M 个子级的脚本,每个子级生成 N 个子级并并行执行所有操作。

最佳答案

这是 Manager.Queue() 的示例。 (不是我在评论中写的那样的监视器,我的错)。如果您去掉所有键盘中断异常处理并试图使其正常关闭,那么这确实是一个非常简单的程序,但它仍然没有做到这一点。现在您有了一个外部进程池,这些进程又生成一个池来执行任务。队列用于将任务提供给外部池工作人员,外部池工作人员又将任务提供给工作人员。他们处于无限循环中等待队列中的东西到达。

如果您想管理外部池工作人员并告诉他们,您当然也可以在那里添加控制消息(例如,如果外部池工作人员收到“退出”一词,它将关闭其池并很好地退出)做不同的事情。

from multiprocessing import Pool, Process
from time import sleep
from random import randint
from multiprocessing import Manager
import sys


alist = [1, 2, 3, 4, 5, 6, 7]


def worker(a):
try:
print a
sleep(randint(0, 2))
except KeyboardInterrupt:
pass


def outer_pool(iq, n):
_ip = Pool(processes=7)
try:
while True:
y = iq.get()
_param = []
for _ny in alist:
_param.append("%d - %d - %d" % (n, _ny, y))
_ip.map(worker, _param)

except KeyboardInterrupt:
try:
_ip.terminate()
except:
pass


c_queue = Manager().Queue()

o_processes = []
for t in alist:
p = Process(target=outer_pool, args=(c_queue, t))
p.start()
o_processes.append(p)

try:
while True:
a = randint(42,100)
c_queue.put(a)
except KeyboardInterrupt:

for _p in o_processes:
try:
_p.terminate()
except:
pass
sys.exit(0)

关于Python 池 生成池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46348589/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com