- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个带有模式的数据框 -
|-- record_id: integer (nullable = true)
|-- Data1: string (nullable = true)
|-- Data2: string (nullable = true)
|-- Data3: string (nullable = true)
|-- Time: timestamp (nullable = true)
我想检索数据中的最后一条记录,按 record_id 和最大时间戳分组。
所以,如果数据最初是这样的:
+----------+---------+---------+---------+-----------------------+
|record_id |Data1 |Data2 |Data3 | Time|
+----------+---------+-------------------------------------------+
| 1 | aaa | null | null | 2018-06-04 21:51:53.0 |
| 1 | null | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 1 | aaa | null | dddd | 2018-06-06 21:51:53.0 |
| 1 | qqqq | wwww | eeee | 2018-06-07 21:51:53.0 |
| 2 | aaa | null | null | 2018-06-04 21:51:53.0 |
| 2 | aaaa | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 3 | aaa | null | dddd | 2018-06-06 21:51:53.0 |
| 3 | aaaa | bbbb | eeee | 2018-06-08 21:51:53.0 |
我想要的输出是
+----------+---------+---------+---------+-----------------------+
|record_id |Data1 |Data2 |Data3 | Time|
+----------+---------+-------------------------------------------+
| 1 | qqqq | wwww | eeee | 2018-06-07 21:51:53.0 |
| 2 | aaaa | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 3 | aaaa | bbbb | eeee | 2018-06-08 21:51:53.0 |
我尝试在同一个流中加入 2 个查询,类似于答案 here .我的代码(其中 df1 是原始数据框):
df1=df1.withWatermark("Timetemp", "2 seconds")
df1.createOrReplaceTempView("tbl")
df1.printSchema()
query="select t.record_id as record_id, max(t.Timetemp) as Timetemp from tbl t group by t.record_id"
df2=spark.sql(query)
df2=df2.withWatermark("Timetemp", "2 seconds")
qws=df1.alias('a').join(df2.alias('b'),((col('a.record_id')==col('b.record_id')) & (col("a.Timetemp")==col("b.Timetemp"))))
query = qws.writeStream.outputMode('append').format('console').start()
query.awaitTermination()
不过,我一直收到这个错误:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
当有明显水印时。可以做什么?我无法使用窗口化,因为流媒体不支持非基于时间的窗口化。
最佳答案
我也有同样的任务。尝试了几个选项,将 current_timestamp
列添加到数据集,并按窗口和带水印的记录 ID 进行分组,但没有任何效果。
据我所知,没有可用于解决此任务的 API。具有分区依据和排序的窗口不适用于流式数据集。
我使用 MapGroupWithState
API 解决了这个任务,但没有保持如下状态:
import spark.implicits._
val stream = spark.readStream
.option("maxFileAge", "24h")
.option("maxFilesPerTrigger", "1000")
.parquet(sourcePath)
.as[input.Data]
val mostRecentRowPerPrimaryKey =
stream
.groupByKey(_.id)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(takeMostRecentOnly)
mostRecentRowPerPrimaryKey
.repartition(5)
.writeStream
.option("checkpointLocation", s"${streamingConfig.checkpointBasePath}/$streamName")
.option("truncate", "false")
.format("console")
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime(60.seconds))
.queryName(streamName)
.start()
def takeMostRecentOnly(pk: Long, values: Iterator[input.Data], state: GroupState[input.Data]): input.Data = {
values.maxBy(_.last_modified)
}
注意:这仅适用于Update
模式。
希望对您有所帮助!
关于apache-spark - Spark 流 : select record with max timestamp for each id in dataframe (pyspark),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50933606/
快速且可能简单的 Lambda 问题: 我有一家有评论的餐厅。我想查询具有以下内容的那个: 最大(平均评分) 和 Max(ReviewCount) 和 Max(NewestReviewDate) 和
在尝试使用 C++17 折叠表达式时,我尝试实现 max sizeof ,其中结果是类型 sizeof 的最大值。我有一个使用变量和 lambda 的丑陋折叠版本,但我想不出一种使用折叠表达式和 st
我目前正在使用 C 并遇到了一些我觉得有趣的东西,但似乎在这里找不到任何类似的东西。 我正在为数组(大小 1000000)静态分配内存。我知道这相当大并且有可能引起问题。但是,使用 10^6 不会出现
我有一个具有 max-height 的 div 和其中的图像,应该使用 max-width:100% 和 max-height:100%。在 Chromium 中,这是可行的,但 Firefox 仅使
我有一个最大高度的 div 和里面的一个图像,它应该使用最大宽度:100% 和最大高度:100%。在 Chromium 中,这是可行的,但 Firefox 仅使用最大宽度而忽略最大高度。 div#ov
在一本在线 awk 手册中我找到了例子awk '{ if (NF > max) max = NF } END { print max }' 该程序打印任何输入行上的最大字段数。但我不明白 awk 如何
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在制作一个非循环图数据库。 表 Material (id_item,id_collection,...)主键(id_item,id_collection) (item可以是collection本身
我有以下两个表。 1.电影详情(电影ID、电影名称、评分、票数、年份) 2.电影类型(Movie-ID,Genre) 我正在使用以下查询来执行连接并获得每个评分最高的电影流派。 select Movi
我有一个查询,我想返回 idevent 中给定传感器 ID (sensorID) 范围内的最高 ID 值,但是查询没有返回最高值。 我运行查询时减去 max() 语句的结果: mysql> SELEC
SUM(MAX() + MAX()) 有正确的方法吗? 这是我一直在努力做的事情 SELECT SUM(MAX(account.BALANCE1) + MAX(account.BALANCE2))
这个问题类似于CSS media queries: max-width OR max-height , 但由于我的代表不够高,我无法在回复中添加评论(问题),我想在原始问题中添加。 与其他主题中的发帖
Jon Skeet今天报告(source): Math.Max(1f, float.NaN) == NaN new[] { 1f, float.NaN }.Max() == 1f 为什么? 编辑:双倍
这个问题已经有答案了: Java 8 stream's .min() and .max(): why does this compile? (5 个回答) 已关闭 7 年前。 我正在学习1z0-809
我在处理一些数据库记录时遇到了一些挑战。 我需要为特定列获取具有 MAX 值的行,并且这些记录必须介于两个时间戳值之间。 这是SQL查询 SELECT id, MAX(amount), created
我想在媒体查询中使用 AND 条件。我使用了下面的代码,但是没有用 @media screen and (max-width: 995px AND max-height: 700px) { } 最佳答
在编写 CSS 媒体查询时,有什么方法可以用“或”逻辑指定多个条件吗? 我正在尝试做这样的事情: /* This doesn't work */ @media screen and (max-widt
我对仅使用 max(list array) 和 np.max(list array) 之间的区别有疑问。 这里唯一的区别是 Python 返回代码所需的时间吗? 最佳答案 它们在边缘情况下可能不同,例
例如: a = [[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.
这个问题在这里已经有了答案: Java 8 stream's .min() and .max(): why does this compile? (5 个答案) 关闭 6 年前。 我正在学习 1z0
我是一名优秀的程序员,十分优秀!