gpt4 book ai didi

python - 有没有一种方法可以遍历 pyspark 数据框并在没有显式 session key 的情况下识别 session ?

转载 作者:行者123 更新时间:2023-12-05 06:59:35 25 4
gpt4 key购买 nike

我有以下格式的 PySpark 数据框:

+-------+----------+---------------------+
| event | consumer | timestamp |
+-------+----------+---------------------+
| E | 1 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 |
| E | 1 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 |
| T | 2 | 2020-09-09 13:20:00 |
| E | 2 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 |
+-------+----------+---------------------+

有没有一种方法可以遍历由 consumer 分区并按 timestamp 排序的组并将值设置为新列?

新列将定义 session_timestamp。这就是它背后的逻辑:

  • session 仅以事件 E 开始。
  • 如果在 session 开始后一小时内发生新事件,则它属于该 session 。
  • 如果某个事件发生的时间超过启动 session 的事件的一个小时,则它属于另一个 session (这就是 DataFrame 中第 2 行和第 3 行之间发生的情况)。

所以上面 Dataframe 的结果是:

+-------+----------+---------------------+---------------------+
| event | consumer | timestamp | session_timestamp |
+-------+----------+---------------------+---------------------+
| E | 1 | 2020-09-09 13:15:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 14:20:00 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 | 2020-09-09 14:20:00 |
| T | 2 | 2020-09-09 13:20:00 | Null |
| E | 2 | 2020-09-09 13:25:00 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 | 2020-09-09 14:45:00 |
+-------+----------+---------------------+---------------------+

有没有办法在 Pyspark 上做到这一点?

最佳答案

正如@Ofek 在评论中所说,window功能会帮助你。这里给你一个scala的例子,你可以自己用python重写。 (考虑到pyspark中用户定义的聚合函数并不容易,这里收集并使用udf处理它)

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = <your-dataframe>

val findSessionStartTime = udf((rows: Seq[Seq[Any]]) => {
val parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

var result: Date = null
for (row <- rows.reverse) {
val event = row(0)
val time = parser.parse(row(1).toString)
if (event == "E") {
if (result == null || result.getTime - time.getTime < 3600000) {
result = time
}
}
}
if (result == null)
null
else
parser.format(result)
})

df.withColumn("events", collect_list(array($"event", $"timestamp")).over(Window
.partitionBy($"consumer")
.orderBy($"timestamp")))
.withColumn("session_timestamp", findSessionStartTime($"events"))
.drop("events")
.show(false)

结果如下:

(此外,您描述的示例结果不正确。2020-09-09 14:20:002020-09-09 13:30:00 之间的时间是 50 分钟 < 1 小时)

+-----+--------+-------------------+-------------------+
|event|consumer|timestamp |session_timestamp |
+-----+--------+-------------------+-------------------+
|E |1 |2020-09-09 13:15:00|2020-09-09 13:15:00|
|E |1 |2020-09-09 13:30:00|2020-09-09 13:15:00|
|E |1 |2020-09-09 14:20:00|2020-09-09 13:15:00|
|T |1 |2020-09-09 14:35:00|2020-09-09 13:15:00|
|T |2 |2020-09-09 13:20:00|null |
|E |2 |2020-09-09 13:25:00|2020-09-09 13:25:00|
|E |2 |2020-09-09 14:45:00|2020-09-09 14:45:00|
|T |2 |2020-09-09 14:50:00|2020-09-09 14:45:00|
+-----+--------+-------------------+-------------------+

关于python - 有没有一种方法可以遍历 pyspark 数据框并在没有显式 session key 的情况下识别 session ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64374547/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com