gpt4 book ai didi

python-3.x - 多处理 HDF5 文件读取,更好的是,多个连接或重复生成

转载 作者:行者123 更新时间:2023-12-04 01:27:19 27 4
gpt4 key购买 nike

我正在尝试并行处理数据。我有一个存储数据的 HDF5 文件,每个传感器都有一个表。传感器的数据独立于其他传感器。我的 HDF5 文件的结构如下:

/root
/sens_metadata
/sens1
/sens2
...

每个传感器表都包含一个 datetime[64] 索引、3 列数据和 2 列表示数据置信度的分数。我的问题是以下哪项是更好的编程方式:

选项 1

为每个子进程打开到 HDF5 文件的连接

def parfunc(sens_id):
with pd.HDFStore('data.h5', 'r') as store:
try:
df = store[sens_id]
except KeyError:
pass
else:
# Do work on the df

def main():
import multiprocessing as mp
maxproc = mp.cpu_count()
with pd.HDFStore('data.h5', 'r') as store:
sens_list = store['sens_metadata'].index.tolist()
with mp.Pool(maxproc, maxtaskperchild=100) as p:
ret = p.map(parfunc, sens_list)

选项 2

在主线程中读取传感器,在每次迭代时重新初始化Pool

def parfunc(df):
# Do work on the df

def main():
import multiprocessing as mp
maxproc = mp.cpu_count()
i = 0
df_list, ret = [], []
with pd.HDFStore('data.h5', 'r') as store:
sens_list = store['sens_metadata'].index.tolist()
for sens in sens_list:
try:
df_list.extend([store[sens]])
except KeyError:
pass
else:
if i == maxproc:
with mp.Pool(maxproc) as p:
ret.extend(p.map(parfunc, df_list))
i, df_list = 0, []
i += 1

现在获取一个表的数据大约需要 0.25 秒。但是,该表的大小只会增加,获取数据的时间会越来越长。在单个进程上,处理一张数据表大约需要 1 分钟。

以上哪个选项是执行此操作的更好方法?或者还有其他更好的方法吗?


选项不可能:

传递 HDF5 缓冲区对象是不可能的,因为它不能被 pickle。 (该对象是 WeakValueDictionary 的子对象。)

from functools import partial

def parfunc(hdf_buff, sens_id):
try:
df = hdf_buff[sens_id]
except KeyError:
pass
else:
# Do work on the df

def main():
import multiprocessing as mp
maxproc = mp.cpu_count()
with pd.HDFStore('data.h5', 'r') as store:
sens_list = store['sens_metadata'].index.tolist()
with mp.Pool(maxproc, maxtaskperchild=100) as p:
ret = pd.concat(p.map(partial(parfunc, hdf_buff=store), sens_list))

最佳答案

可能有点旧,但如果有人正在处理一些相关问题,它会有所帮助。我正在阅读有关您不可能的选择的信息,因为我遇到了同样的问题,并且在阅读了此 link 之后.我实现了这样的东西

HDF_LOCK = threading.Lock()
HDF_PATH = 'path'

@contextmanager
def locked_file():
with HDF_LOCK:
with h5py.File(HDF_PATH, 'r') as file:
yield file

def process_files():
with locked_file() as file:
timestamp = file.attrs.get('start_timestamp', 0)
dataset = file.get('series')

with concurrent.futures.ProcessPoolExecutor(10) as executor:
for group, res in ((group, executor.submit(process_groups, group,
timestamp)) for group in dataset):
print(res.result())

process_groups 是我处理 HDF5 文件的函数。您可以在主函数中使用 locked_file() 函数来处理文件。

关于python-3.x - 多处理 HDF5 文件读取,更好的是,多个连接或重复生成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40684896/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com