gpt4 book ai didi

apache-spark - Pyspark:在窗口内使用 udf

转载 作者:行者123 更新时间:2023-12-05 05:17:14 24 4
gpt4 key购买 nike

我需要使用 Pyspark 检测时间序列的阈值。在下面的示例图中,我想检测(通过存储关联的时间戳)每次出现的参数 ALT_STD 大于 5000 然后小于 5000。

ALT_STD vs Time

对于这个简单的案例,我可以运行简单的查询,例如

t_start = df.select('timestamp')\
.filter(df.ALT_STD > 5000)\
.sort('timestamp')\
.first()
t_stop = df.select('timestamp')\
.filter((df.ALT_STD < 5000)\
& (df.timestamp > t_start.timestamp))\
.sort('timestamp')\
.first()

但是,在某些情况下,事件可能是周期性的,我可能有几条曲线(即多次 ALT_STD 将升高到 5000 以上或以下)。当然,如果我使用上面的查询,我将只能检测到第一次出现的情况。

我想我应该将窗口函数与 udf 一起使用,但我找不到可行的解决方案。我的猜测是算法应该是这样的:

windowSpec = Window.partitionBy('flight_hash')\
.orderBy('timestamp')\
.rowsBetween(Window.currentRow, 1)

def detect_thresholds(x):
if (x['ALT_STD'][current_row]< 5000) and (x['ALT_STD'][next_row] > 5000):
return x['timestamp'] #Or maybe simply 1
if (x['ALT_STD'][current_row]> 5000) and (x['ALT_STD'][current_row] > 5000):
return x['timestamp'] #Or maybe simply 2
else:
return 0

import pyspark.sql.functions as F
detect_udf = F.udf(detect_threshold, IntegerType())
df.withColumn('Result', detect_udf(F.Struct('ALT_STD')).over(windowSpec).show()

这样的算法在 Pyspark 中可行吗?怎么样?

后记:作为旁注,我已经了解如何使用 udf 或 udf 和内置的 sql 窗口函数,但不知道如何组合 udf 和窗口。例如:

# This will compute the mean (built-in function)
df.withColumn("Result", F.mean(df['ALT_STD']).over(windowSpec)).show()

# This will also work
divide_udf = F.udf(lambda x: x[0]/1000., DoubleType())
df.withColumn('result', divide_udf(F.struct('timestamp')))

最佳答案

这里不需要udf(而且python udfs不能作为窗口函数使用)。只需将 lead/lagwhen 一起使用:

from pyspark.sql.functions import col, lag, lead, when

result = (when((col('ALT_STD') < 5000) & (lead(col('ALT_STD'), 1) > 5000), 1)
.when(col('ALT_STD') > 5000) & (lead(col('ALT_STD'), 1) < 5000), 1)
.otherwise(0))

df.withColum("result", result)

关于apache-spark - Pyspark:在窗口内使用 udf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49556117/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com