gpt4 book ai didi

python - 在多 CPU 环境中并行化大量功能

转载 作者:太空宇宙 更新时间:2023-11-04 03:40:33 26 4
gpt4 key购买 nike

我想创建一个函数,在给定函数列表和相关参数的情况下,启动尽可能多的进程以并行执行这些任务。正在运行的进程数不能超过我的 CPU 的核心数。当一个进程结束时,它应该被另一个进程替换,直到结束。

我尝试使用 python 池来实现这样的事情。这是我的功能:

from multiprocessing import Pool, cpu_count

CPUS = cpu_count()

def parallelize(functions, args):
results = []
if CPUS > 1:
for i in xrange(0, len(functions), CPUS):
pool = Pool()
for j in xrange(CPUS):
if i + j >= len(functions):
break
results.append(pool.apply_async(functions[i + j], args = args[i + j]))
pool.close()
pool.join()
map(lambda x: x.get(), results)
else:
for i in xrange(len(functions)):
results.append(functions[i](*args[i]))
return results

此实现批量分割函数列表。每个批量维度等于实际 CPU 的数量。问题是它实际上一直等到每批函数完成,然后再次启动另一批进程。
我不希望出现这种行为,因为如果批量中有一个非常慢的函数,其他 CPU 将等待它完成,然后再启动新进程。

什么是正确的做法?

最佳答案

看来你把这个复杂化了。 multiprocessing.Pool 将始终以您指定的进程数运行,无论您向其提供多少工作项。因此,如果您创建 Pool(CPUS),则 Pool 永远不会同时运行超过 CPU 的任务,即使您向它提供 code>CPUS * 100 任务。因此,它满足了您的要求,即在不执行任何特殊工作的情况下,永远不会运行超过 CPU 的任务。因此,您可以遍历整个方法和参数列表,并对它们调用 apply_async,根本不用担心批处理调用。 Pool 将执行所有任务,但一次不会超过 CPUS 个任务:

def parallelize(methods, args):
results = []
if CPUS > 1:
pool = Pool(CPUS)
for method, arg in zip(methods, args):
results.append(pool.apply_async(method, args=arg))
pool.close()
pool.join()
out = map(lambda x: x.get(), results)
else:
for i in xrange(len(methods)):
results.append(methods[i](*args[i]))
return results

关于python - 在多 CPU 环境中并行化大量功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26677277/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com