gpt4 book ai didi

python - 多进程池与 asyncio.run_in_executor

转载 作者:太空宇宙 更新时间:2023-11-03 15:52:41 35 4
gpt4 key购买 nike

概览

我正在尝试并行化一个文本分类项目,该项目肯定需要很长时间才能完全串行运行。我已经尝试了这两种可能的变体,我相信它们的功能相似,并且对我在资源监视器中看到的每种结果很好奇。

第一个解决方案

我尝试的第一件事是将 Python 的 asynciorun_in_executor() 结合使用。这是我以前用来并行化一些简单作业的方法,所以我想我会试一试。它看起来像这样:

cores = cpu_count()
qty = len(data)
last = 0

coros = []
loop = asyncio.get_event_loop()
for i in range(cores):
top = ceil((qty * (i + 1)) / cores)
cor = loop.run_in_executor(None, vect.transform, data[last: top])
last = int(top)

vectors = loop.run_until_complete(asyncio.gather(*coros))
loop.close()

我的理解是这段代码创建了一个默认的线程池,在每个数据 block 上运行func,并按照它们进来的相同顺序返回结果。我预计这会超出我的 cpu 但它似乎只以 15% 的速度运行。查看各个处理器,其中只有一半在使用,每个处理器的使用率仅为 20%-30% 左右。这种情况甚至会一直持续到我变得不耐烦并停止该程序或尝试更改某些内容以简化它为止。

第二种方案

我采用的第二种方法是利用 multiprocessing 模块及其 Pool().map()。这对我来说是新的,但它的功能似乎与第一个解决方案类似,但使用的是进程,因此我认为它可能会以更接近我预期的方式利用处理器。该解决方案如下所示:

cores = cpu_count()
qty = len(data)
last = 0

chunks = []

for i in range(cores):
top = ceil((qty * (i + 1)) / cores)

chunks.append(data[last: top])
last = int(top)

with Pool(processes=cores) as pool:
vectors = pool.map(vect.transform, chunks)

我认为这段代码应该将数据分块,然后打开新的进程 来处理数据。最初,资源监视器完全符合我的预期。 CPU 使用率达到 100%,所有内核都显示完全活跃。大约 15 秒后,大部分核心都停放了,其中两三个在 15% 左右。几分钟后,每个核心似乎都在这里和那里填满了口袋,在任何给定时间平均约为 45%,并最终完成。

我的期望/问题

稍后在程序中,我将数据拟合到具有 n_jobs=-1 的 sklearn 分类器。发生这种情况时,所有核心都会达到 100%。这就是我想做的。所以我有几个问题:

  1. 什么是 sklearn 而我没有做的?
  2. 在我的方法中是否有什么东西让我的期望不切实际?

最佳答案

再运行几次后,第二种方法开始按预期运行。我不确定是什么让它发生了变化,但它现在以 100% 的速度运行 CPU。

此外,在进行一些挖掘之后,我得出的结论是 sklearn 可能使用 joblib 来并行化作业。查看 joblib,我发现 default parallelizing code n_jobs=-1 与我的第二个解决方案几乎相同。然而,总体而言,joblib 更加灵活,并且可以针对更复杂的用例以更简洁的方式编写。

关于python - 多进程池与 asyncio.run_in_executor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45618893/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com