gpt4 book ai didi

python - 使用并发 future python 3.5 处理大文件的最快方法

转载 作者:行者123 更新时间:2023-12-03 13:09:10 26 4
gpt4 key购买 nike

我正在尝试使用并发 future 来掌握多线程/多处理。

我试过使用以下几组代码。我知道我总是会遇到磁盘 IO 问题,但我想最大限度地提高我的 ram 和 CPU 使用率。

大规模处理最常用/最好的方法是什么?

如何使用并发 Future 来处理大型数据集?

是否有比以下方法更受欢迎的方法?

方法一:

for folders in os.path.isdir(path):
p = multiprocessing.Process(pool.apply_async(process_largeFiles(folders)))
jobs.append(p)
p.start()

方法二:

with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
for folders in os.path.isdir(path):
executor.submit(process_largeFiles(folders), 100)

方法三:

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for folders in os.path.isdir(path):
executor.submit(process_largeFiles(folders), 10)

我应该尝试同时使用进程池和线程池吗?

方法(思想):

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as process:
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as thread:
for folders in os.path.isdir(path):
process.submit(thread.submit(process_largeFiles(folders), 100),10)

在最广泛的用例中最大化我的 ram 和 cpu 的最有效方法是什么?

我知道启动进程需要一些时间,但我正在处理的文件的大小会超过它吗?

最佳答案

使用 TreadPoolExecutor 打开并读取文件,然后使用 ProcessPoolExecutor 处理数据。

import concurrent.futures
from collections import deque

TPExecutor = concurrent.futures.ThreadPoolExecutor
PPExecutor = concurrent.futures.ProcessPoolExecutor
def get_file(path):
with open(path) as f:
data = f.read()
return data

def process_large_file(s):
return sum(ord(c) for c in s)

files = [filename1, filename2, filename3, filename4, filename5,
filename6, filename7, filename8, filename9, filename0]

results = []
completed_futures = collections.deque()

def callback(future, completed=completed_futures):
completed.append(future)

with TPExecutor(max_workers = 4) as thread_pool_executor:
data_futures = [thread_pool_executor.submit(get_file, path) for path in files]
with PPExecutor() as process_pool_executor:
for data_future in concurrent.futures.as_completed(data_futures):
future = process_pool_executor.submit(process_large_file, data_future.result())
future.add_done_callback(callback)
# collect any that have finished
while completed_futures:
results.append(completed_futures.pop().result())

使用完成回调,因此不必等待完成的 future 。我不知道这如何影响效率 - 主要使用它来简化 as_completed 循环中的逻辑/代码。

如果由于内存限制而需要限制文件或数据提交,则需要对其进行重构。根据文件读取时间和处理时间,很难说在任何给定时刻内存中有多少数据。我认为在 as_completed 中收集结果应该有助于缓解这种情况。 data_futures 可能会在 ProcessPoolExecutor 设置时开始完成 - 可能需要优化排序。

关于python - 使用并发 future python 3.5 处理大文件的最快方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42941584/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com