作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在处理一个表示事件流的数据集(例如作为来自网站的跟踪事件触发)。所有事件都有一个时间戳。我们经常遇到的一个用例是试图找到给定字段的第一个非空值。例如,像这样的事情让我们最接近那里:
val eventsDf = spark.read.json(jsonEventsPath)
case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )
val projectedEventsDs = eventsDf.select(
eventsDf("message.visit.id").alias("visitId"),
eventsDf("message.property.user_id").alias("userId"),
eventsDf("message.property.timestamp"),
...
).as[ProjectedFields]
projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))
first
的数据的顺序。不保证聚合功能。我希望它按
timestamp
排序以确保它是时间戳的第一个非空用户 ID,而不是任何随机的非空用户 ID。
case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
OrderedKeyValue("a", null, 1),
OrderedKeyValue("a", null, 2),
OrderedKeyValue("a", "x", 3),
OrderedKeyValue("a", "y", 4),
OrderedKeyValue("a", null, 5)
).toDS()
ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()
Array([a,y])
有时
Array([a,x])
最佳答案
使用我心爱的 window (......体验你的生活变得多么简单!)
import org.apache.spark.sql.expressions.Window
val byKeyOrderByOrdering = Window
.partitionBy("key")
.orderBy("ordering")
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
import org.apache.spark.sql.functions.first
val firsts = ds.withColumn("first",
first("value", ignoreNulls = true) over byKeyOrderByOrdering)
scala> firsts.show
+---+-----+--------+-----+
|key|value|ordering|first|
+---+-----+--------+-----+
| a| null| 1| x|
| a| null| 2| x|
| a| x| 3| x|
| a| y| 4| x|
| a| null| 5| x|
+---+-----+--------+-----+
rangeBetween
我认为应该是默认的无界范围。
关于apache-spark - 如何在组中找到第一个非空值? (使用数据集api进行二次排序),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42986965/
我是一名优秀的程序员,十分优秀!