gpt4 book ai didi

python - 如何使用 StdLib 和 Python 3 在一个范围内并行化迭代?

转载 作者:太空宇宙 更新时间:2023-11-03 10:49:58 25 4
gpt4 key购买 nike

几天来我一直在寻找这方面的答案,但无济于事。我可能只是不理解那里漂浮的部分,multiprocessing 模块上的 Python 文档相当大,我不清楚。

假设您有以下 for 循环:

import timeit


numbers = []

start = timeit.default_timer()

for num in range(100000000):
numbers.append(num)

end = timeit.default_timer()

print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

输出:

TIME: 23.965870224497916 seconds
SUM: 4999999950000000

对于此示例,假设您有一个 4 核处理器。有没有办法总共创建 4 个进程,其中每个进程都在单独的 CPU 内核上运行并且完成速度大约快 4 倍,所以 24 秒/4 个进程 = ~6 秒?

以某种方式将 for 循环分成 4 个相等的 block ,然后将这 4 个 block 添加到数字列表中以等于相同的总和?有这个 stackoverflow 线程:Parallel Simple For Loop但我不明白。谢谢大家。

最佳答案

是的,这是可行的。您的计算不依赖于中间结果,因此您可以轻松地将任务分成 block 并将其分配给多个进程。这就是所谓的

embarrassingly parallel problem.

这里唯一棘手的部分可能是,首先将范围划分为相当相等的部分。直出我个人lib的两个函数来处理这个:

# mp_utils.py

from itertools import accumulate

def calc_batch_sizes(n_tasks: int, n_workers: int) -> list:
"""Divide `n_tasks` optimally between n_workers to get batch_sizes.

Guarantees batch sizes won't differ for more than 1.

Example:
# >>>calc_batch_sizes(23, 4)
# Out: [6, 6, 6, 5]

In case you're going to use numpy anyway, use np.array_split:
[len(a) for a in np.array_split(np.arange(23), 4)]
# Out: [6, 6, 6, 5]
"""
x = int(n_tasks / n_workers)
y = n_tasks % n_workers
batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y)

return batch_sizes


def build_batch_ranges(batch_sizes: list) -> list:
"""Build batch_ranges from list of batch_sizes.

Example:
# batch_sizes [6, 6, 6, 5]
# >>>build_batch_ranges(batch_sizes)
# Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)]
"""
upper_bounds = [*accumulate(batch_sizes)]
lower_bounds = [0] + upper_bounds[:-1]
batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)]

return batch_ranges

那么你的主脚本应该是这样的:

import time
from multiprocessing import Pool
from mp_utils import calc_batch_sizes, build_batch_ranges


def target_foo(batch_range):
return sum(batch_range) # ~ 6x faster than target_foo1


def target_foo1(batch_range):
numbers = []
for num in batch_range:
numbers.append(num)
return sum(numbers)


if __name__ == '__main__':

N = 100000000
N_CORES = 4

batch_sizes = calc_batch_sizes(N, n_workers=N_CORES)
batch_ranges = build_batch_ranges(batch_sizes)

start = time.perf_counter()
with Pool(N_CORES) as pool:
result = pool.map(target_foo, batch_ranges)
r_sum = sum(result)
print(r_sum)
print(f'elapsed: {time.perf_counter() - start:.2f} s')

请注意,我还将您的 for 循环切换为对范围对象进行简单求和,因为它提供了更好的性能。如果您不能在您的真实应用中执行此操作,列表理解仍然比您的示例中手动填充列表快 60%。

示例输出:

4999999950000000
elapsed: 0.51 s

Process finished with exit code 0

关于python - 如何使用 StdLib 和 Python 3 在一个范围内并行化迭代?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52636002/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com