gpt4 book ai didi

apache-spark - 在 PySpark Structured Streaming 中对多个输出流使用单个流式 DataFrame

转载 作者:行者123 更新时间:2023-12-04 08:22:36 24 4
gpt4 key购买 nike

有一个连续的数据流,在所有转换之后它有下一个模式:

root
|-- message_id: string (nullable = true)
|-- device_id: string (nullable = true)
|-- metric_id: long (nullable = true)
|-- value: long (nullable = true)
|-- timestamp: string (nullable = true)

还有一套规则,即:

if metric_id = 4077 and value > 10 and value < 25

这意味着如果流中的任何行满足该条件,则必须将该行推送到不同的流中。

如何识别满足警报条件(有多个)的消息并将它们推送到不同的流?

最佳答案

Spark Structured Streaming 应用程序允许您使用相同的输入流拥有多个输出流。

这意味着,例如,如果 df 是您的输入流 DataFrame,您可以定义一个 DataFrame 过滤器并将生成的、过滤后的 DataFrame 用于另一个输出流,如下所示:

df = readStream.format(...).options(...).load().select(...)

# create a new DataFrame that only contains alters
alertsDf = df.filter( (df.metric_id == "4077") & (df.value > 10) & (df.value < 45) )

# use both DataFrames for output streams
df.writeStream.format(...).options(...).start()
alertsDf.writeStream.format(...).options(...).start()

spark.streams.awaitTermination()

为了容错,建议为每个输出流分别设置选项 checkpointLocation

关于apache-spark - 在 PySpark Structured Streaming 中对多个输出流使用单个流式 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65427058/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com