gpt4 book ai didi

python - Python中的多处理和多线程

转载 作者:行者123 更新时间:2023-12-04 00:53:29 25 4
gpt4 key购买 nike

我有一个 python 程序,它 1)从磁盘读取一个非常大的文件(约 95% 的时间),然后 2)处理并提供一个相对较小的输出(约 5% 的时间)。该程序将在 TeraBytes 文件上运行。

现在我希望通过利用多处理和多线程来优化这个程序。我正在运行的平台是一个在虚拟机上有 4 个处理器的虚拟机。

我计划有一个调度程序进程,它将执行 4 个进程(与处理器相同),然后每个进程应该有一些线程,因为大部分是 I/O。每个线程将处理 1 个文件并将结果报告给主线程,主线程又将通过 IPC 将其报告回调度程序进程。调度程序可以将这些排队并最终以有序的方式将它们写入磁盘

所以想知道如何决定为这种情况创建的进程和线程的数量?有没有一种数学方法可以找出最好的组合。

谢谢

最佳答案

我想我会安排它与你正在做的相反。也就是说,我将创建一个特定大小的线程池来负责产生结果。提交到此池的任务将作为参数传递给处理器池,工作线程可以使用该处理器池来提交受 CPU 限制的工作部分。换句话说,线程池工作人员将主要执行所有与磁盘相关的操作,并将任何 CPU 密集型工作移交给处理器池。

处理器池的大小应该就是您环境中的处理器数量。很难给线程池一个精确的大小;这取决于在 yield 递减定律发挥作用之前它可以处理多少并发磁盘操作。它还取决于您的内存:池越大,占用的内存资源就越大,尤其是在必须将整个文件读入内存进行处理的情况下。因此,您可能必须尝试使用​​此值。下面的代码概述了这些想法。您从线程池中获得的 I/O 操作的重叠比您仅使用小型处理器池所能达到的效果要大:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os

def cpu_bound_function(arg1, arg2):
...
return some_result



def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'r') as f:
# Do disk related operations:
. . . # code omitted
# Now we have to do a CPU-intensive operation:
future = process_pool_executor.submit(cpu_bound_function, arg1, arg2)
result = future.result() # get result
return result

file_list = [file_1, file_2, file_n]
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have

with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list)

重要提示:

另一种更简单的方法是只使用一个处理器池,其大小大于您拥有的 CPU 处理器数量,例如 25 个。工作进程将同时执行 I/O和 CPU 操作。即使您的进程多于 CPU,许多进程仍将处于等待 I/O 完成的等待状态,从而允许 CPU 密集型工作运行。

这种方法的缺点是创建 N 个进程的开销远大于创建 N 个线程 + 少量进程的开销。然而,随着提交到池中的任务的运行时间变得越来越大,这种增加的开销在总运行时间中所占的百分比越来越小。因此,如果您的任务不是微不足道的,那么这可能是一个相当高效的简化。

更新:两种方法的基准

我针对两种处理 24 个大小约为 10,000KB 的文件的方法进行了一些基准测试(实际上,这只是 3 个不同的文件,每个文件处理了 8 次,因此可能已经完成了一些缓存):

方法一(线程池+处理器池)

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
from math import sqrt
import timeit


def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum

def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'rb') as f:
b = f.read()
future = process_pool_executor.submit(cpu_bound_function, b)
result = future.result() # get result
return result

def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have

with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list))
print(results)

if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))

方法 2(仅限处理器池)

from concurrent.futures import ProcessPoolExecutor
from math import sqrt
import timeit


def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum

def io_bound_function(file_name):
with open(file_name, 'rb') as f:
b = f.read()
result = cpu_bound_function(b)
return result

def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_PROCESSES = 50 # depends on your configuration on how well the I/O can be overlapped
N_PROCESSES = min(N_FILES, MAX_PROCESSES) # no point in creating more threds than required

with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(process_pool_executor.map(io_bound_function, file_list))
print(results)

if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))

结果:

(我有 8 个核心)

线程池 + 处理器池:13.5 秒仅处理器池:13.3 秒

结论:我会首先尝试更简单的方法,即对所有内容都使用处理器池。现在棘手的一点是确定要创建的最大进程数,这是您最初问题的一部分,当它所做的只是 CPU 密集型计算时,它有一个简单的答案。如果您正在阅读的文件数量不是太多,那么这一点是没有意义的;每个文件可以有一个进程。但是,如果您有数百个文件,您将不希望池中有数百个进程(您可以创建多少个进程也有上限,并且还有那些令人讨厌的内存限制)。我无法给你一个确切的数字。如果您确实有大量文件,请从较小的池大小开始并不断增加,直到您没有进一步的好处(当然,您可能不希望处理超过这些测试的最大数量的文件,否则您将永远运行,只是为实际运行确定一个合适的池大小)。

关于python - Python中的多处理和多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64532146/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com