gpt4 book ai didi

python - 使用非唯一索引列日期在 Dask 数据框中提取最新值

转载 作者:行者123 更新时间:2023-12-05 03:33:06 25 4
gpt4 key购买 nike

我对 pandas dataframes 非常熟悉,但我对 Dask 还很陌生,所以我仍在努力研究并行化我的代码。我已经使用 pandas 和 pandarallel 获得了我想要的结果,所以我想弄清楚的是我是否可以使用 Dask 扩大任务或以某种方式加速它。

假设我的数据框具有作为非唯一索引的日期时间、一个值列和一个 ID 列。

time                        value   id
2021-01-01 00:00:00.210281 28.08 293707
2021-01-01 00:00:00.279228 28.07 293708
2021-01-01 00:00:00.697341 28.08 293709
2021-01-01 00:00:00.941704 28.08 293710
2021-01-01 00:00:00.945422 28.07 293711
... ... ...
2021-01-01 23:59:59.288914 29.84 512665
2021-01-01 23:59:59.288914 29.83 512666
2021-01-01 23:59:59.288914 29.82 512667
2021-01-01 23:59:59.525227 29.84 512668
2021-01-01 23:59:59.784754 29.84 512669

我要提取的是每一秒的最新值。例如如果之前价格合适2021-01-01 00:00:01是索引为 2021-01-01 00:00:00.945422 的行最新值为 28.07 .

就我而言,有时索引值不是唯一的,因此作为决胜局,我想使用 id柱子。 id 最大的值number 将被视为最新值。对于当时三个值并列的情况 2021-01-01 23:59:59.288914 , 值 29.82将被选择,因为最大的 id该日期将是 512667 .另请注意 id在整个数据集中并不一致,我不能只依赖它来排序我的数据。

在 pandas 中,我只是通过获取最后一个索引来做到这一点

last_index = df.loc[date_minus60: date_curr].index[-1]
last_values = df.loc[last_index]

然后如果 last_values.index.is_unique 的值是假的,我终于执行last_values.sort_values('id').iloc[-1] .

我一直很难将此代码转换为 Dask,遇到有关我的延迟函数的问题,导致他们需要计算才能再次重新索引我的数据帧。

我想知道是否有处理此类问题的最佳实践。

最佳答案

@Kafkaesque 这是考虑使用 map_partitions 的另一种方法,它将自定义函数映射到每个分区,将每个分区视为 Pandas DataFrame。通常,建议直接使用 dask.dataframe 方法。然而,在这种情况下,dask.DataFrame.sort_values 仅支持按单列排序,因此 map_partitions 是一个不错的选择。您还可以找到 these Dask Groupby examples有帮助。

值得注意的是,使用 map_partitions + groupby 仅在您的数据集已经排序时才有效,这样相同的秒数位于相同的分区中。以下示例针对数据未排序的情况:

import dask
import dask.dataframe as dd
import pandas as pd

# example dataset, use sample() to "unsort"
ddf = dask.datasets.timeseries(
freq="250ms", partition_freq="5d", seed=42
).sample(frac=0.9, replace=True, random_state=42)

# first set the rounded timestamp as the index before calling map_partitions
# (don't need to reset the index if your dataset is already sorted)
ddf = ddf.reset_index()
ddf = ddf.assign(round_timestamp=ddf['timestamp'].dt.floor('S')).set_index('round_timestamp')

def custom_func(df):
return (
df
.sort_values(by=['timestamp', 'id'])
.groupby('round_timestamp')
.last()
)

new_ddf = ddf.map_partitions(custom_func)

# shows embarrassingly parallel execution of 'custom_func' across each partition
new_ddf.visualize(optimize_graph=True)

# check the result of the first partition
new_ddf.partitions[0].compute()

关于python - 使用非唯一索引列日期在 Dask 数据框中提取最新值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70374896/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com