- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 python 程序,它 1)从磁盘读取一个非常大的文件(约 95% 的时间),然后 2)处理并提供一个相对较小的输出(约 5% 的时间)。该程序将在 TeraBytes 文件上运行。
现在我希望通过利用多处理和多线程来优化这个程序。我正在运行的平台是一个在虚拟机上有 4 个处理器的虚拟机。
我计划有一个调度程序进程,它将执行 4 个进程(与处理器相同),然后每个进程应该有一些线程,因为大部分是 I/O。每个线程将处理 1 个文件并将结果报告给主线程,主线程又将通过 IPC 将其报告回调度程序进程。调度程序可以将这些排队并最终以有序的方式将它们写入磁盘
所以想知道如何决定为这种情况创建的进程和线程的数量?有没有一种数学方法可以找出最好的组合。
谢谢
最佳答案
我想我会安排它与你正在做的相反。也就是说,我将创建一个特定大小的线程池来负责产生结果。提交到此池的任务将作为参数传递给处理器池,工作线程可以使用该处理器池来提交受 CPU 限制的工作部分。换句话说,线程池工作人员将主要执行所有与磁盘相关的操作,并将任何 CPU 密集型工作移交给处理器池。
处理器池的大小应该就是您环境中的处理器数量。很难给线程池一个精确的大小;这取决于在 yield 递减定律发挥作用之前它可以处理多少并发磁盘操作。它还取决于您的内存:池越大,占用的内存资源就越大,尤其是在必须将整个文件读入内存进行处理的情况下。因此,您可能必须尝试使用此值。下面的代码概述了这些想法。您从线程池中获得的 I/O 操作的重叠比您仅使用小型处理器池所能达到的效果要大:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
def cpu_bound_function(arg1, arg2):
...
return some_result
def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'r') as f:
# Do disk related operations:
. . . # code omitted
# Now we have to do a CPU-intensive operation:
future = process_pool_executor.submit(cpu_bound_function, arg1, arg2)
result = future.result() # get result
return result
file_list = [file_1, file_2, file_n]
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have
with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list)
重要提示:
另一种更简单的方法是只使用一个处理器池,其大小大于您拥有的 CPU 处理器数量,例如 25 个。工作进程将同时执行 I/O和 CPU 操作。即使您的进程多于 CPU,许多进程仍将处于等待 I/O 完成的等待状态,从而允许 CPU 密集型工作运行。
这种方法的缺点是创建 N 个进程的开销远大于创建 N 个线程 + 少量进程的开销。然而,随着提交到池中的任务的运行时间变得越来越大,这种增加的开销在总运行时间中所占的百分比越来越小。因此,如果您的任务不是微不足道的,那么这可能是一个相当高效的简化。
更新:两种方法的基准
我针对两种处理 24 个大小约为 10,000KB 的文件的方法进行了一些基准测试(实际上,这只是 3 个不同的文件,每个文件处理了 8 次,因此可能已经完成了一些缓存):
方法一(线程池+处理器池)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
from math import sqrt
import timeit
def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum
def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'rb') as f:
b = f.read()
future = process_pool_executor.submit(cpu_bound_function, b)
result = future.result() # get result
return result
def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have
with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list))
print(results)
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))
方法 2(仅限处理器池)
from concurrent.futures import ProcessPoolExecutor
from math import sqrt
import timeit
def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum
def io_bound_function(file_name):
with open(file_name, 'rb') as f:
b = f.read()
result = cpu_bound_function(b)
return result
def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_PROCESSES = 50 # depends on your configuration on how well the I/O can be overlapped
N_PROCESSES = min(N_FILES, MAX_PROCESSES) # no point in creating more threds than required
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(process_pool_executor.map(io_bound_function, file_list))
print(results)
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))
结果:
(我有 8 个核心)
线程池 + 处理器池:13.5 秒仅处理器池:13.3 秒
结论:我会首先尝试更简单的方法,即对所有内容都使用处理器池。现在棘手的一点是确定要创建的最大进程数,这是您最初问题的一部分,当它所做的只是 CPU 密集型计算时,它有一个简单的答案。如果您正在阅读的文件数量不是太多,那么这一点是没有意义的;每个文件可以有一个进程。但是,如果您有数百个文件,您将不希望池中有数百个进程(您可以创建多少个进程也有上限,并且还有那些令人讨厌的内存限制)。我无法给你一个确切的数字。如果您确实有大量文件,请从较小的池大小开始并不断增加,直到您没有进一步的好处(当然,您可能不希望处理超过这些测试的最大数量的文件,否则您将永远运行,只是为实际运行确定一个合适的池大小)。
关于python - Python中的多处理和多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64532146/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!