gpt4 book ai didi

pyspark : Interpolation of missing values in pyspark dataframe observed

转载 作者:行者123 更新时间:2023-12-02 11:07:58 25 4
gpt4 key购买 nike

我正在尝试使用 Spark 清理时间序列数据集,该数据集未完全填充且相当大。

我想做的是转换以下数据集

Group | TS          |  Value
____________________________
A | 01-01-2018 | 1
A | 01-02-2018 | 2
A | 01-03-2018 |
A | 01-04-2018 |
A | 01-05-2018 | 5
A | 01-06-2018 |
A | 01-07-2018 | 10
A | 01-08-2018 | 11

并将其转换为以下内容

Group | TS          |  Value>
____________________________
A | 01-01-2018 | 1
A | 01-02-2018 | 2
A | 01-03-2018 | 3
A | 01-04-2018 | 4
A | 01-05-2018 | 5
A | 01-06-2018 | 7.5
A | 01-07-2018 | 10
A | 01-08-2018 | 11

如果您能提供帮助,我们将不胜感激。

最佳答案

与 @ndricca 聊天后,我用 @leo 建议更新了代码。

第一个数据帧创建:

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [
("A","01-01-2018",1),
("A","01-02-2018",2),
("A","01-03-2018",None),
("A","01-04-2018",None),
("A","01-05-2018",5),
("A","01-06-2018",None),
("A","01-07-2018",10),
("A","01-08-2018",11)
]
df = spark.createDataFrame(data,['Group','TS','Value'])
df = df.withColumn('TS',F.unix_timestamp('TS','MM-dd-yyyy').cast('timestamp'))

接下来是更新的功能:

def fill_linear_interpolation(df,id_cols,order_col,value_col):
"""
Apply linear interpolation to dataframe to fill gaps.

:param df: spark dataframe
:param id_cols: string or list of column names to partition by the window function
:param order_col: column to use to order by the window function
:param value_col: column to be filled

:returns: spark dataframe updated with interpolated values
"""
# create row number over window and a column with row number only for non missing values

w = Window.partitionBy(id_cols).orderBy(order_col)
new_df = df.withColumn('rn',F.row_number().over(w))
new_df = new_df.withColumn('rn_not_null',F.when(F.col(value_col).isNotNull(),F.col('rn')))

# create relative references to the start value (last value not missing)
w_start = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(Window.unboundedPreceding,-1)
new_df = new_df.withColumn('start_val',F.last(value_col,True).over(w_start))
new_df = new_df.withColumn('start_rn',F.last('rn_not_null',True).over(w_start))

# create relative references to the end value (first value not missing)
w_end = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(0,Window.unboundedFollowing)
new_df = new_df.withColumn('end_val',F.first(value_col,True).over(w_end))
new_df = new_df.withColumn('end_rn',F.first('rn_not_null',True).over(w_end))

if not isinstance(id_cols, list):
id_cols = [id_cols]

# create references to gap length and current gap position
new_df = new_df.withColumn('diff_rn',F.col('end_rn')-F.col('start_rn'))
new_df = new_df.withColumn('curr_rn',F.col('diff_rn')-(F.col('end_rn')-F.col('rn')))

# calculate linear interpolation value
lin_interp_func = (F.col('start_val')+(F.col('end_val')-F.col('start_val'))/F.col('diff_rn')*F.col('curr_rn'))
new_df = new_df.withColumn(value_col,F.when(F.col(value_col).isNull(),lin_interp_func).otherwise(F.col(value_col)))

new_df = new_df.drop('rn', 'rn_not_null', 'start_val', 'end_val', 'start_rn', 'end_rn', 'diff_rn', 'curr_rn')
return new_df

然后在我们的 DataFrame 上执行函数:

new_df = fill_linear_interpolation(df=df,id_cols='Group',order_col='TS',value_col='Value')

还在我的 df 上检查了它 -> post ,您必须先创建额外的group列。

关于pyspark : Interpolation of missing values in pyspark dataframe observed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53077639/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com