gpt4 book ai didi

python - 使用 ThreadPoolExecutor 减少内存占用

转载 作者:行者123 更新时间:2023-12-01 08:51:19 25 4
gpt4 key购买 nike

我正在使用 ThreadPoolExecutor 来下载大量(~400k)关键帧图像。关键帧名称存储在文本文件中(假设keyframes_list.txt)。

我修改了 documentation 中提供的示例它似乎完美地工作,但有一个异常(exception):很明显,该示例将每个链接传递给一个 future 对象,这些对象都传递给一个可迭代对象(dict())精确的)。该可迭代对象作为参数传递给 as_completed() 函数,以检查 future 何时完成。这当然需要一次性将大量文本加载到内存中。我执行此任务的 python 进程占用了 1GB RAM。

完整代码如下:

import concurrent.futures
import requests

def download_keyframe(keyframe_name):
url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
r = requests.get(url, allow_redirects=True)
open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
return True

keyframes_list_path = '/path/to/keyframes_list.txt'
future_to_url = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
with open(keyframes_list_path, 'r') as f:
for i, line in enumerate(f):
fields = line.split('\t')
keyframe_name = fields[0]
future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
for future in concurrent.futures.as_completed(future_to_url):
keyframe_name = future_to_url[future]
try:
future.result()
except Exception as exc:
print('%r generated an exception: %s' % (keyframe_name, exc))
else:
print('Keyframe: {} was downloaded.'.format(keyframe_name))

所以,我的问题是如何提供可迭代并保持较低的内存占用。我考虑过使用queue,但我不确定它是否能与ThreadPoolExecutor顺利配合。有没有一种简单的方法来控制提交给 ThreadPoolExecutor 的 Future 数量?

最佳答案

AdamKG 的回答是一个好的开始,但他的代码将等到一个 block 被完全处理后再开始处理下一个 block 。因此,您会损失一些性能。

我建议采用稍微不同的方法,将连续的任务流提供给执行器,同时强制并行任务的最大数量上限,以保持较低的内存占用。

诀窍是使用 concurrent.futures.wait 来跟踪已完成的 future 和仍待完成的 future:

def download_keyframe(keyframe_name):
try:
url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
r = requests.get(url, allow_redirects=True)
open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
except Exception as e:
return keyframe_name, e

return keyframe_name, None

MAX_WORKERS = 8
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
with open(keyframes_list_path, 'r') as fh:
futures_notdone = set()
futures_done = set()
for i, line in enumerate(fh):
# Submit new task to executor.
fields = line.split('\t')
keyframe_name = fields[0]
futures_notdone.add(executor.submit(download_keyframe, keyframe_name))

# Enforce upper bound on number of parallel tasks.
if len(futures_notdone) >= MAX_WORKERS:
done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
futures_done.update(done)

# Process results.
for future in futures_done:
keyframe_name, exc = future.result()
if exc:
print('%r generated an exception: %s' % (keyframe_name, exc))
else:
print('Keyframe: {} was downloaded.'.format(keyframe_name))

当然,您也可以定期处理循环内的结果,以便时不时地清空futures_done。例如,每次 futures_done 中的项目数量超过 1000(或任何其他适合您需求的数量)时,您都可以执行此操作。如果您的数据集非常大并且结果本身就会导致大量内存使用,这可能会派上用场。

关于python - 使用 ThreadPoolExecutor 减少内存占用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53104082/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com