gpt4 book ai didi

apache-spark - Spark 流 : select record with max timestamp for each id in dataframe (pyspark)

转载 作者:行者123 更新时间:2023-12-05 06:33:02 26 4
gpt4 key购买 nike

我有一个带有模式的数据框 -

 |-- record_id: integer (nullable = true)
|-- Data1: string (nullable = true)
|-- Data2: string (nullable = true)
|-- Data3: string (nullable = true)
|-- Time: timestamp (nullable = true)

我想检索数据中的最后一条记录,按 record_id 和最大时间戳分组。

所以,如果数据最初是这样的:

 +----------+---------+---------+---------+-----------------------+
|record_id |Data1 |Data2 |Data3 | Time|
+----------+---------+-------------------------------------------+
| 1 | aaa | null | null | 2018-06-04 21:51:53.0 |
| 1 | null | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 1 | aaa | null | dddd | 2018-06-06 21:51:53.0 |
| 1 | qqqq | wwww | eeee | 2018-06-07 21:51:53.0 |
| 2 | aaa | null | null | 2018-06-04 21:51:53.0 |
| 2 | aaaa | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 3 | aaa | null | dddd | 2018-06-06 21:51:53.0 |
| 3 | aaaa | bbbb | eeee | 2018-06-08 21:51:53.0 |

我想要的输出是

 +----------+---------+---------+---------+-----------------------+
|record_id |Data1 |Data2 |Data3 | Time|
+----------+---------+-------------------------------------------+
| 1 | qqqq | wwww | eeee | 2018-06-07 21:51:53.0 |
| 2 | aaaa | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 3 | aaaa | bbbb | eeee | 2018-06-08 21:51:53.0 |

我尝试在同一个流中加入 2 个查询,类似于答案 here .我的代码(其中 df1 是原始数据框):

df1=df1.withWatermark("Timetemp", "2 seconds")
df1.createOrReplaceTempView("tbl")
df1.printSchema()
query="select t.record_id as record_id, max(t.Timetemp) as Timetemp from tbl t group by t.record_id"
df2=spark.sql(query)
df2=df2.withWatermark("Timetemp", "2 seconds")

qws=df1.alias('a').join(df2.alias('b'),((col('a.record_id')==col('b.record_id')) & (col("a.Timetemp")==col("b.Timetemp"))))

query = qws.writeStream.outputMode('append').format('console').start()

query.awaitTermination()

不过,我一直收到这个错误:

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

当有明显水印时。可以做什么?我无法使用窗口化,因为流媒体不支持非基于时间的窗口化。

最佳答案

我也有同样的任务。尝试了几个选项,将 current_timestamp 列添加到数据集,并按窗口和带水印的记录 ID 进行分组,但没有任何效果。

据我所知,没有可用于解决此任务的 API。具有分区依据和排序的窗口不适用于流式数据集。

我使用 MapGroupWithState API 解决了这个任务,但没有保持如下状态:

import spark.implicits._

val stream = spark.readStream
.option("maxFileAge", "24h")
.option("maxFilesPerTrigger", "1000")
.parquet(sourcePath)
.as[input.Data]

val mostRecentRowPerPrimaryKey =
stream
.groupByKey(_.id)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(takeMostRecentOnly)

mostRecentRowPerPrimaryKey
.repartition(5)
.writeStream
.option("checkpointLocation", s"${streamingConfig.checkpointBasePath}/$streamName")
.option("truncate", "false")
.format("console")
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime(60.seconds))
.queryName(streamName)
.start()

def takeMostRecentOnly(pk: Long, values: Iterator[input.Data], state: GroupState[input.Data]): input.Data = {
values.maxBy(_.last_modified)
}

注意:这仅适用于Update 模式。

希望对您有所帮助!

关于apache-spark - Spark 流 : select record with max timestamp for each id in dataframe (pyspark),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50933606/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com