作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有以下格式的 PySpark 数据框:
+-------+----------+---------------------+
| event | consumer | timestamp |
+-------+----------+---------------------+
| E | 1 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 |
| E | 1 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 |
| T | 2 | 2020-09-09 13:20:00 |
| E | 2 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 |
+-------+----------+---------------------+
有没有一种方法可以遍历由 consumer
分区并按 timestamp
排序的组并将值设置为新列?
新列将定义 session_timestamp
。这就是它背后的逻辑:
E
开始。所以上面 Dataframe 的结果是:
+-------+----------+---------------------+---------------------+
| event | consumer | timestamp | session_timestamp |
+-------+----------+---------------------+---------------------+
| E | 1 | 2020-09-09 13:15:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 14:20:00 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 | 2020-09-09 14:20:00 |
| T | 2 | 2020-09-09 13:20:00 | Null |
| E | 2 | 2020-09-09 13:25:00 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 | 2020-09-09 14:45:00 |
+-------+----------+---------------------+---------------------+
有没有办法在 Pyspark 上做到这一点?
最佳答案
正如@Ofek 在评论中所说,window
功能会帮助你。这里给你一个scala的例子,你可以自己用python重写。 (考虑到pyspark中用户定义的聚合函数并不容易,这里收集并使用udf处理它)
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df = <your-dataframe>
val findSessionStartTime = udf((rows: Seq[Seq[Any]]) => {
val parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var result: Date = null
for (row <- rows.reverse) {
val event = row(0)
val time = parser.parse(row(1).toString)
if (event == "E") {
if (result == null || result.getTime - time.getTime < 3600000) {
result = time
}
}
}
if (result == null)
null
else
parser.format(result)
})
df.withColumn("events", collect_list(array($"event", $"timestamp")).over(Window
.partitionBy($"consumer")
.orderBy($"timestamp")))
.withColumn("session_timestamp", findSessionStartTime($"events"))
.drop("events")
.show(false)
结果如下:
(此外,您描述的示例结果不正确。2020-09-09 14:20:00
和2020-09-09 13:30:00
之间的时间是 50 分钟 < 1 小时)
+-----+--------+-------------------+-------------------+
|event|consumer|timestamp |session_timestamp |
+-----+--------+-------------------+-------------------+
|E |1 |2020-09-09 13:15:00|2020-09-09 13:15:00|
|E |1 |2020-09-09 13:30:00|2020-09-09 13:15:00|
|E |1 |2020-09-09 14:20:00|2020-09-09 13:15:00|
|T |1 |2020-09-09 14:35:00|2020-09-09 13:15:00|
|T |2 |2020-09-09 13:20:00|null |
|E |2 |2020-09-09 13:25:00|2020-09-09 13:25:00|
|E |2 |2020-09-09 14:45:00|2020-09-09 14:45:00|
|T |2 |2020-09-09 14:50:00|2020-09-09 14:45:00|
+-----+--------+-------------------+-------------------+
关于python - 有没有一种方法可以遍历 pyspark 数据框并在没有显式 session key 的情况下识别 session ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64374547/
我是一名优秀的程序员,十分优秀!