gpt4 book ai didi

sql - Spark SQL : how to create sessionId for user-item

转载 作者:行者123 更新时间:2023-12-01 22:06:24 24 4
gpt4 key购买 nike

假设我有这样的数据集:

|    item    | event       | timestamp |   user    |
|:-----------|------------:|:---------:|:---------:|
| titanic | view | 1 | 1 |
| titanic | add to bag | 2 | 1 |
| titanic | close | 3 | 1 |
| avatar | view | 6 | 1 |
| avatar | close | 10 | 1 |
| titanic | view | 20 | 1 |
| titanic | purchase | 30 | 1 |

等等。我需要为每个用户计算与特定项目相对应的连续事件的 sessionId。

因此对于特定的数据输出应该如下:

|    item    | event       | timestamp |   user    |   sessionId    |
|:-----------|------------:|:---------:|:---------:|:--------------:|
| titanic | view | 1 | 1 | session1 |
| titanic | add to bag | 2 | 1 | session1 |
| titanic | close | 3 | 1 | session1 |
| avatar | view | 6 | 1 | session2 |
| avatar | close | 10 | 1 | session2 |
| titanic | view | 20 | 1 | session3 |
| titanic | purchase | 30 | 1 | session3 |

我尝试使用此处描述的类似方法 Spark: How to create a sessionId based on userId and timestamp带窗:

Window.partitionBy("user", "item").orderBy("timestamp")

但这行不通,因为同一个用户 - 项目组合可能在不同的 session 中。例如,请参阅 session1session3
通过该窗口,它们成为同一个 session 。需要有关如何实现的另一种方法的帮助。

最佳答案

这是一种方法,首先生成一列带有条件 null 的时间戳值,使用 last(ts, ignoreNulls)rowsBetween 来回填最后一个非空时间戳值,最后使用dense_rank构造sessionId:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
("titanic", "view", 1, 1),
("titanic", "add to bag", 2, 1),
("titanic", "close", 3, 1),
("avatar", "view", 6, 1),
("avatar", "close", 10, 1),
("titanic", "view", 20, 1),
("titanic", "purchase", 30, 1)
).toDF("item", "event", "timestamp", "user")

val win1 = Window.partitionBy($"user").orderBy($"timestamp")
val win2 = Window.partitionBy($"user").orderBy($"sessTS")

df.
withColumn( "firstTS",
when( row_number.over(win1) === 1 || $"item" =!= lag($"item", 1).over(win1),
$"timestamp" )
).
withColumn( "sessTS",
last($"firstTS", ignoreNulls = true).
over(win1.rowsBetween(Window.unboundedPreceding, 0))
).
withColumn("sessionId", concat(lit("session"), dense_rank.over(win2))).
show

// +-------+----------+---------+----+-------+------+---------+
// | item| event|timestamp|user|firstTS|sessTS|sessionId|
// +-------+----------+---------+----+-------+------+---------+
// |titanic| view| 1| 1| 1| 1| session1|
// |titanic|add to bag| 2| 1| null| 1| session1|
// |titanic| close| 3| 1| null| 1| session1|
// | avatar| view| 6| 1| 6| 6| session2|
// | avatar| close| 10| 1| null| 6| session2|
// |titanic| view| 20| 1| 20| 20| session3|
// |titanic| purchase| 30| 1| null| 20| session3|
// +-------+----------+---------+----+-------+------+---------+

关于sql - Spark SQL : how to create sessionId for user-item,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51575900/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com