gpt4 book ai didi

python - 对面向队列的函数使用多处理后没有性能提升

转载 作者:行者123 更新时间:2023-12-01 09:02:43 24 4
gpt4 key购买 nike

我想要优化的真实代码太复杂,无法包含在此处,因此这里是一个简化的示例:

def enumerate_paths(n, k):
"""
John want to go up a flight of stairs that has N steps. He can take
up to K steps each time. This function enumerate all different ways
he can go up this flight of stairs.
"""
paths = []
to_analyze = [(0,)]

while to_analyze:
path = to_analyze.pop()
last_step = path[-1]

if last_step >= n:
# John has reach the top
paths.append(path)
continue

for i in range(1, k + 1):
# possible paths from this point
extended_path = path + (last_step + i,)
to_analyze.append(extended_path)

return paths

输出看起来像这样

>>> enumerate_paths(3, 2)
[(0, 2, 4), (0, 2, 3), (0, 1, 3), (0, 1, 2, 4), (0, 1, 2, 3)]

您可能会发现结果令人困惑,因此这里有一个解释。例如,(0, 1, 2, 4)表示John可以按时间顺序将脚放在第一步、第二步和第四步上,最后他停在第4步,因为他只需要向上走3 个步骤。

我尝试将多处理合并到此代码片段中,但没有观察到性能提升,甚至没有一点点!

import multiprocessing

def enumerate_paths_worker(n, k, queue):
paths = []

while not queue.empty():
path = queue.get()
last_step = path[-1]

if last_step >= n:
# John has reach the top
paths.append(path)
continue

for i in range(1, k + 1):
# possible paths from this point
extended_path = path + (last_step + i,)
queue.put(extended_path)

return paths


def enumerate_paths(n, k):
pool = multiprocessing.Pool()
manager = multiprocessing.Manager()
queue = manager.Queue()

path_init = (0,)
queue.put(path_init)
apply_result = pool.apply_async(enumerate_paths_worker, (n, k, queue))

return apply_result.get()

Python列表to_analysis的作用就像一个任务队列,队列中的每个项目都可以单独处理,所以我认为这个函数有可能通过采用多线程/处理来优化。另请注意,项目的顺序并不重要。事实上,在优化它时,你可以返回一个Python集、一个Numpy数组或一个Pandas数据框,只要它们代表同一组路径即可。

奖励问题:通过使用 Numpy、Pandas 或 Scipy 等科学软件包来完成这样的任务,我可以获得多少性能?

最佳答案

TL;DR

如果您的实际算法不涉及比您在示例中向我们展示的更昂贵的计算,则多处理的通信开销将占主导地位,并使您的计算时间比顺序执行长很多倍。

<小时/>

您对 apply_async 的尝试实际上只使用了池中的一个工作线程,这就是为什么您看不到差异。 apply_async 的设计只是一次为一名 worker 提供服务。此外,如果您的工作人员需要共享中间结果,那么仅将串行版本传递到池中是不够的,因此您必须修改目标函数才能实现这一点。

但正如在简介中已经说过的,只有当计算量足够大以收回进程间通信(和进程创建)的开销时,您的计算才会从多处理中受益。

下面针对一般问题的解决方案使用 JoinableQueue 结合进程终止的哨兵值来同步工作流程。我添加了一个函数 busy_foo 来使计算量更大,以显示多处理具有其优势的情况。

from multiprocessing import Process
from multiprocessing import JoinableQueue as Queue
import time

SENTINEL = 'SENTINEL'

def busy_foo(x = 10e6):
for _ in range(int(x)):
x -= 1


def enumerate_paths(q_analyze, q_result, n, k):
"""
John want to go up a flight of stairs that has N steps. He can take
up to K steps each time. This function enumerate all different ways
he can go up this flight of stairs.
"""
for path in iter(q_analyze.get, SENTINEL):
last_step = path[-1]

if last_step >= n:
busy_foo()
# John has reach the top
q_result.put(path)
q_analyze.task_done()
continue
else:
busy_foo()
for i in range(1, k + 1):
# possible paths from this point
extended_path = path + (last_step + i,)
q_analyze.put(extended_path)
q_analyze.task_done()


if __name__ == '__main__':

N_CORES = 4

N = 6
K = 2

start = time.perf_counter()
q_analyze = Queue()
q_result = Queue()

q_analyze.put((0,))

pool = []
for _ in range(N_CORES):
pool.append(
Process(target=enumerate_paths, args=(q_analyze, q_result, N, K))
)

for p in pool:
p.start()

q_analyze.join() # block until everything is processed

for p in pool:
q_analyze.put(SENTINEL) # let the processes exit gracefully

results = []
while not q_result.empty():
results.append(q_result.get())

for p in pool:
p.join()

print(f'elapsed: {time.perf_counter() - start: .2f} s')

结果

如果我使用上面的代码并将 busy_foo 注释掉,则需要 N=30,K=2(2178309 个结果):

  • ~208s N_CORES=4
  • 2.78s sequential original

Pickling 和 Unpickling、针对锁运行的线程等,导致了这种巨大的差异。

现在,为两者启用 busy_foo 并且 N=6,K=2(21 个结果),需要:

  • 6.45s N_CORES=4
  • 30.46s sequential original

这里的计算量足够大,可以收回开销。

Numpy

Numpy 可以多次加速矢量化操作,但您可能会看到 numpy 在这方面的性能损失。 Numpy 使用连续的内存块作为数组。当您更改数组大小时,整个数组必须再次重建,这与使用 python 列表不同。

关于python - 对面向队列的函数使用多处理后没有性能提升,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52343188/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com