gpt4 book ai didi

python - 使用 Dask 并行化 HDF 读取-翻译-写入

转载 作者:太空宇宙 更新时间:2023-11-03 14:42:14 24 4
gpt4 key购买 nike

TL;DR:我们在将 Pandas 代码与从同一 HDF 读取和写入的 Dask 并行化时遇到问题

我正在开发一个项目,该项目通常需要三个步骤:读取、翻译(或组合数据)和写入这些数据。就上下文而言,我们正在处理医疗记录,我们收到不同格式的 claim ,将其转换为标准化格式,然后将其重新写入磁盘。理想情况下,我希望以某种形式保存中间数据集,以便稍后可以通过 Python/Pandas 访问。

目前,我选择 HDF 作为我的数据存储格式,但是我遇到了运行时问题。对于大量人口,我的代码目前可能需要几天的时间。这促使我研究 Dask,但我不确定我是否已将 Dask 最适合我的情况。

下面是我的工作流程的一个工作示例,希望有足够的示例数据来了解运行时问题。

读取(在本例中为创建)数据

import pandas as pd
import numpy as np
import dask
from dask import delayed
from dask import dataframe as dd
import random
from datetime import timedelta
from pandas.io.pytables import HDFStore

member_id = range(1, 10000)
window_start_date = pd.to_datetime('2015-01-01')
start_date_col = [window_start_date + timedelta(days=random.randint(0, 730)) for i in member_id]

# Eligibility records
eligibility = pd.DataFrame({'member_id': member_id,
'start_date': start_date_col})
eligibility['end_date'] = eligibility['start_date'] + timedelta(days=365)
eligibility['insurance_type'] = np.random.choice(['HMO', 'PPO'], len(member_id), p=[0.4, 0.6])
eligibility['gender'] = np.random.choice(['F', 'M'], len(member_id), p=[0.6, 0.4])
(eligibility.set_index('member_id')
.to_hdf('test_data.h5',
key='eligibility',
format='table'))

# Inpatient records
inpatient_record_number = range(1, 20000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in inpatient_record_number]
inpatient = pd.DataFrame({'inpatient_record_number': inpatient_record_number,
'service_date': service_date})
inpatient['member_id'] = np.random.choice(list(range(1, 10000)), len(inpatient_record_number))
inpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(inpatient_record_number))
(inpatient.set_index('member_id')
.to_hdf('test_data.h5',
key='inpatient',
format='table'))

# Outpatient records
outpatient_record_number = range(1, 30000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in outpatient_record_number]
outpatient = pd.DataFrame({'outpatient_record_number': outpatient_record_number,
'service_date': service_date})
outpatient['member_id'] = np.random.choice(range(1, 10000), len(outpatient_record_number))
outpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(outpatient_record_number))
(outpatient.set_index('member_id')
.to_hdf('test_data.h5',
key='outpatient',
format='table'))

翻译/写入数据

顺序方法

def pull_member_data(member_i):
inpatient_slice = pd.read_hdf('test_data.h5', 'inpatient', where='index == "{}"'.format(member_i))
outpatient_slice = pd.read_hdf('test_data.h5', 'outpatient', where='index == "{}"'.format(member_i))
return inpatient_slice, outpatient_slice


def create_visits(inpatient_slice, outpatient_slice):
# In reality this is more complicated, using some logic to combine inpatient/outpatient/ER into medical 'visits'
# But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
visits_stacked = pd.concat([inpatient_slice, outpatient_slice]).reset_index().sort_values('service_date')
visits_stacked.insert(0, 'visit_id', range(1, len(visits_stacked) + 1))
return visits_stacked


def save_visits_to_hdf(visits_slice):
with HDFStore('test_data.h5', mode='a') as store:
store.append('visits', visits_slice)


# Read in the data by member_id, perform some operation
def translate_by_member(member_i):
inpatient_slice, outpatient_slice = pull_member_data(member_i)
visits_slice = create_visits(inpatient_slice, outpatient_slice)
save_visits_to_hdf(visits_slice)


def run_translate_sequential():
# Simple approach: Loop through each member sequentially
for member_i in member_id:
translate_by_member(member_i)

run_translate_sequential()

上面的代码在我的机器上运行大约需要 9 分钟。

Dask 方法

def create_visits_dask_version(visits_stacked):
# In reality this is more complicated, using some logic to combine inpatient/outpatient/ER
# But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
len_of_visits = visits_stacked.shape[0]
visits_stacked_1 = (visits_stacked
.sort_values('service_date')
.assign(visit_id=range(1, len_of_visits + 1))
.set_index('visit_id')
)
return visits_stacked_1


def run_translate_dask():
# Approach 2: Dask, with individual writes to HDF
inpatient_dask = dd.read_hdf('test_data.h5', 'inpatient')
outpatient_dask = dd.read_hdf('test_data.h5', 'outpatient')
stacked = dd.concat([inpatient_dask, outpatient_dask])
visits = stacked.groupby('member_id').apply(create_visits_dask_version)
visits.to_hdf('test_data_dask.h5', 'visits')

run_translate_dask()

此 Dask 方法需要 13 秒(!)

虽然这是一个很大的改进,但我们通常对以下几件事感到好奇:

  1. 鉴于这个简单的示例,使用 Dask 数据帧、连接它们以及使用 groupby/apply 的方法是最佳方法吗?

  2. 实际上,我们有多个这样的进程,它们从同一个 HDF 读取数据,并写入同一个 HDF。我们最初的代码库的结构方式允许一次运行一个 member_id 整个工作流程。当我们尝试并行化它们时,它有时适用于小样本,但大多数时候会产生分割错误。像这样的并行工作流程(使用 HDF 进行读/写)是否存在已知问题?我们也在努力制作一个这样的示例,但我们认为我们应该将其发布在此处,以防触发建议(或者如果此代码可以帮助面临类似问题的人)。

感谢任何和所有反馈!

最佳答案

一般来说,groupby-apply 会相当慢。使用这样的数据通常具有挑战性,尤其是在有限的内存中。

一般来说,我建议使用 Parquet 格式(dask.dataframe 具有 to_ 和 read_parquet 函数)。与 HDF 文件相比,出现段错误的可能性要小得多。

关于python - 使用 Dask 并行化 HDF 读取-翻译-写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46497323/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com