gpt4 book ai didi

apache-spark - Spark 结构化流中的滞后函数

转载 作者:行者123 更新时间:2023-12-05 07:32:10 24 4
gpt4 key购买 nike

我正在使用 Spark 2.3 结构化流式传输并尝试使用“滞后”功能。然而,结构化流式传输似乎不支持延迟。

val output = spark.sql("SELECT temperature, time, lag(temperature, 1) OVER (ORDER BY time) AS PrevTemp FROM InputTable")

得到这个错误:

org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;

是否有其他方法可以通过结构化流媒体实现这种“滞后”功能?

谢谢!

最佳答案

据我所知,没有。

您可能会玩 mapGroupsWithState。例如:

    case class PayLoad(event_time: java.sql.Timestamp, data: String)

def mappingFunction(key: java.sql.Timestamp, values: Iterator[PayLoad], state: GroupState[PayLoad]): PayLoad = {
??? // Work with values iterator
}

val temperature: DataFrame = ???
temperature
.withColumn("event_time", org.apache.spark.sql.functions.current_timestamp())
.as[PayLoad]
.groupByKey(_.event_time)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mappingFunction)

您不需要保持状态,但通过这种方式您可以访问值迭代器并且您能够解决任何任务。

请记住,在这种情况下,所有微批处理数据都将进入一个分区,并且负载巨大可能会导致巨大的延迟甚至 OOM。 (以及 OVER (ORDER BY time))

希望对您有所帮助。

关于apache-spark - Spark 结构化流中的滞后函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51447797/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com