- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
TL;DR:我们在将 Pandas 代码与从同一 HDF 读取和写入的 Dask 并行化时遇到问题
我正在开发一个项目,该项目通常需要三个步骤:读取、翻译(或组合数据)和写入这些数据。就上下文而言,我们正在处理医疗记录,我们收到不同格式的 claim ,将其转换为标准化格式,然后将其重新写入磁盘。理想情况下,我希望以某种形式保存中间数据集,以便稍后可以通过 Python/Pandas 访问。
目前,我选择 HDF 作为我的数据存储格式,但是我遇到了运行时问题。对于大量人口,我的代码目前可能需要几天的时间。这促使我研究 Dask,但我不确定我是否已将 Dask 最适合我的情况。
下面是我的工作流程的一个工作示例,希望有足够的示例数据来了解运行时问题。
读取(在本例中为创建)数据
import pandas as pd
import numpy as np
import dask
from dask import delayed
from dask import dataframe as dd
import random
from datetime import timedelta
from pandas.io.pytables import HDFStore
member_id = range(1, 10000)
window_start_date = pd.to_datetime('2015-01-01')
start_date_col = [window_start_date + timedelta(days=random.randint(0, 730)) for i in member_id]
# Eligibility records
eligibility = pd.DataFrame({'member_id': member_id,
'start_date': start_date_col})
eligibility['end_date'] = eligibility['start_date'] + timedelta(days=365)
eligibility['insurance_type'] = np.random.choice(['HMO', 'PPO'], len(member_id), p=[0.4, 0.6])
eligibility['gender'] = np.random.choice(['F', 'M'], len(member_id), p=[0.6, 0.4])
(eligibility.set_index('member_id')
.to_hdf('test_data.h5',
key='eligibility',
format='table'))
# Inpatient records
inpatient_record_number = range(1, 20000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in inpatient_record_number]
inpatient = pd.DataFrame({'inpatient_record_number': inpatient_record_number,
'service_date': service_date})
inpatient['member_id'] = np.random.choice(list(range(1, 10000)), len(inpatient_record_number))
inpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(inpatient_record_number))
(inpatient.set_index('member_id')
.to_hdf('test_data.h5',
key='inpatient',
format='table'))
# Outpatient records
outpatient_record_number = range(1, 30000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in outpatient_record_number]
outpatient = pd.DataFrame({'outpatient_record_number': outpatient_record_number,
'service_date': service_date})
outpatient['member_id'] = np.random.choice(range(1, 10000), len(outpatient_record_number))
outpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(outpatient_record_number))
(outpatient.set_index('member_id')
.to_hdf('test_data.h5',
key='outpatient',
format='table'))
翻译/写入数据
顺序方法
def pull_member_data(member_i):
inpatient_slice = pd.read_hdf('test_data.h5', 'inpatient', where='index == "{}"'.format(member_i))
outpatient_slice = pd.read_hdf('test_data.h5', 'outpatient', where='index == "{}"'.format(member_i))
return inpatient_slice, outpatient_slice
def create_visits(inpatient_slice, outpatient_slice):
# In reality this is more complicated, using some logic to combine inpatient/outpatient/ER into medical 'visits'
# But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
visits_stacked = pd.concat([inpatient_slice, outpatient_slice]).reset_index().sort_values('service_date')
visits_stacked.insert(0, 'visit_id', range(1, len(visits_stacked) + 1))
return visits_stacked
def save_visits_to_hdf(visits_slice):
with HDFStore('test_data.h5', mode='a') as store:
store.append('visits', visits_slice)
# Read in the data by member_id, perform some operation
def translate_by_member(member_i):
inpatient_slice, outpatient_slice = pull_member_data(member_i)
visits_slice = create_visits(inpatient_slice, outpatient_slice)
save_visits_to_hdf(visits_slice)
def run_translate_sequential():
# Simple approach: Loop through each member sequentially
for member_i in member_id:
translate_by_member(member_i)
run_translate_sequential()
上面的代码在我的机器上运行大约需要 9 分钟。
Dask 方法
def create_visits_dask_version(visits_stacked):
# In reality this is more complicated, using some logic to combine inpatient/outpatient/ER
# But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
len_of_visits = visits_stacked.shape[0]
visits_stacked_1 = (visits_stacked
.sort_values('service_date')
.assign(visit_id=range(1, len_of_visits + 1))
.set_index('visit_id')
)
return visits_stacked_1
def run_translate_dask():
# Approach 2: Dask, with individual writes to HDF
inpatient_dask = dd.read_hdf('test_data.h5', 'inpatient')
outpatient_dask = dd.read_hdf('test_data.h5', 'outpatient')
stacked = dd.concat([inpatient_dask, outpatient_dask])
visits = stacked.groupby('member_id').apply(create_visits_dask_version)
visits.to_hdf('test_data_dask.h5', 'visits')
run_translate_dask()
此 Dask 方法需要 13 秒(!)
虽然这是一个很大的改进,但我们通常对以下几件事感到好奇:
鉴于这个简单的示例,使用 Dask 数据帧、连接它们以及使用 groupby/apply 的方法是最佳方法吗?
实际上,我们有多个这样的进程,它们从同一个 HDF 读取数据,并写入同一个 HDF。我们最初的代码库的结构方式允许一次运行一个 member_id
整个工作流程。当我们尝试并行化它们时,它有时适用于小样本,但大多数时候会产生分割错误。像这样的并行工作流程(使用 HDF 进行读/写)是否存在已知问题?我们也在努力制作一个这样的示例,但我们认为我们应该将其发布在此处,以防触发建议(或者如果此代码可以帮助面临类似问题的人)。
感谢任何和所有反馈!
最佳答案
一般来说,groupby-apply 会相当慢。使用这样的数据通常具有挑战性,尤其是在有限的内存中。
一般来说,我建议使用 Parquet 格式(dask.dataframe 具有 to_ 和 read_parquet 函数)。与 HDF 文件相比,出现段错误的可能性要小得多。
关于python - 使用 Dask 并行化 HDF 读取-翻译-写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46497323/
翻译自官方wiki: https://github.com/facebook/rocksdb/wiki/Write-Stalls 转载请注明出处: https://www.cnblogs.c
译者注:在微服务架构设计,构建API和服务间通信技术选型时,对 REST 和 gRPC 的理解和应用还存在知识盲区,近期看到国外的这篇文章: A detailed comparison of
rocksdb调试指引 翻译自官方wiki: https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide 转载请注明出处: h
传统的ASP.NET Web Forms是一个非常好的主意,但现实需求非常复杂。随着时间的推移,现实世界的项目暴露出Web Forms的一些不足之处: “沉重的”视图状态:现实中在http请求之间
翻译自:Top 10 questions of Java Strings 简单地说,”==”测试两个字符串的引用是否相同,equals()测试两个字符串的值是否相同。除非你希望检
你好,今天我要和大家分享一些东西,举例来说这个在JavaScript中用的很多。我要讲讲回调(callbacks)。你知道什么时候用,怎么用这个吗?你真的理解了它在java环境中的用法了吗?当我也问
Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用。而线程是在进程中执行的一个
原文: [A Dive into .Net 8 Native AOT and Efficient Web Development] 作者: [sharmila subbiah] 引言 随着 .NE
这是Fiddle 是否可以在 angular-translate 中检查其他语言的键值是否可用,然后它可以从其他语言中提取该键值? 就像在示例中,我有英语和西类牙语。并且一个键值(例如“CONFIRM
我希望能够使用 $this->__('String to translate')在外部脚本中。我该怎么做呢? Magento 版本 1.5.1.0 . 最佳答案 我认为设置语言环境的正确方法是: Ma
我有一个开关小部件,它使用自定义数据属性值来标记自己。 .switch.switch-text .switch-label::before { right: 1px; color: #c2cf
是否有人遇到过这样的情况:用 Java 编写并由(例如)法国程序员编写的现有代码库必须转换为英语程序员可以理解的代码?这里的问题是变量/方法/类名称、注释等都将采用该特定语言。 现在有可用的自动化解决
维基百科和其他一些网站将解释器描述为将代码从某种高级语言翻译成某种低级语言的翻译器。然而,有很多解释,包括在 stackoverflow 中,它说解释器直接执行作为输入的指令,而无需事先转换。那么解释
我想将基本动画应用于自定义单元格中的某些元素,例如标签、图像:特别是,我想让这些动画在我触摸单元格内部时也启动。我是初学者,我只学会了使用 animateWithDuration 和 transiti
这个问题在这里已经有了答案: NSDateFormatter and current language in iOS11 (5 个回答) 已关闭 3 年前。 当使用这样的 DateComponentF
我想在点击 var about 时移动 div.willshow。但我单击那个 btn,只有它获得类 active。然后我再次单击那个 btn 它失去了类。如果我再点击一次,每项任务都无法正常工作。
我想要一个按钮在悬停时向下移动几个像素,但它又回来了。当您还在上面徘徊时,它不应该留在原处吗? Email Me .btn {background: #2ecc71; padding: .5em 1e
在我的应用程序中,我想添加功能将页面翻译为用户在浏览器中设置的所有语言,如果没有可用的语言,则翻译为默认英语...问题是浏览器与语言支持不一致。我找到了一个解决方法,我对一些返回用户语言的 Web 服
我的应用程序有一个 Help.htm 文件,用谷歌翻译翻译得相当好。我想将菜单项标记为“请勿翻译”,但我发现并尝试过的 HTML 标签都不起作用。对于以下内容,我使用了谷歌翻译网站 - 它翻译了我没想
我有以下代码: span { width:200px; height:100px; background-color:red; border:1px solid black; } span.c2 {
我是一名优秀的程序员,十分优秀!