gpt4 book ai didi

python - 基于特定开始和结束事件的 Pyspark 拆分 session

转载 作者:行者123 更新时间:2023-12-04 10:00:59 24 4
gpt4 key购买 nike

我的 df 是按 session 和时间戳排序的,如下所示;

    df = spark.createDataFrame(
[[1, '2020-01-01 12:30:00.000', 'foo'], [1, '2020-01-01 12:31:00.000', 'bar'], [1, '2020-01-01 12:32:00.000', 'foo'],
[1, '2020-01-01 12:33:00.000', 'foo'], [2, '2020-01-01 13:00:00.000', 'bar'], [2, '2020-01-01 13:01:00.000', 'foo'],
[2, '2020-01-01 13:02:00.000', 'bar'], [2, '2020-01-01 13:03:00.000', 'foo']],
['session_id', 'timestamp', 'event']
)
df.show(truncate=False)
+----------+-----------------------+-----+
|session_id|timestamp |event|
+----------+-----------------------+-----+
|1 |2020-01-01 12:30:00.000|foo |
|1 |2020-01-01 12:31:00.000|bar |
|1 |2020-01-01 12:32:00.000|foo |
|1 |2020-01-01 12:33:00.000|foo |
|2 |2020-01-01 13:00:00.000|bar |
|2 |2020-01-01 13:01:00.000|foo |
|2 |2020-01-01 13:02:00.000|bar |
|2 |2020-01-01 13:03:00.000|foo |
+----------+-----------------------+-----+

我希望我的 session 遵循特定模式。他们需要从事件“bar”开始,并有一个(或多个)“foo”事件。每当发生新的“酒吧”事件时,我都想将其归类为新 session 。不属于这种模式的事件应该被删除,例如最初的 'foo' 事件。
所需的输出应如下所示:
    df_res = spark.createDataFrame(
[[1, '2020-01-01 12:31:00.000', 'bar'], [1, '2020-01-01 12:32:00.000', 'foo'],
[1, '2020-01-01 12:33:00.000', 'foo'], [2, '2020-01-01 13:00:00.000', 'bar'], [2, '2020-01-01 13:01:00.000', 'foo'],
[3, '2020-01-01 13:02:00.000', 'bar'], [3, '2020-01-01 13:03:00.000', 'foo']],
['session_id', 'timestamp', 'event']
)
df_res.show(truncate=False)
+----------+------------------------+-----+
|session_id|timestamp |event|
+----------+------------------------+-----+
|1 |2020-01-01 12:31:00.000 |bar |
|1 |2020-01-01 12:32:00.000 |foo |
|1 |2020-01-01 12:33:00.000 |foo |
|2 |2020-01-01 13:00:00.000 |bar |
|2 |2020-01-01 13:01:00.000 |foo |
|3 |2020-01-01 13:02:00.000 |bar |
|3 |2020-01-01 13:03:00.000 |foo |
+----------+------------------------+-----+

我试过做一个 groupby 和 collect_list 然后拆分或展平,但我不确定如何继续。欢迎任何帮助!
df.groupBy("session_id").agg(F.collect_list("event").alias("list_event"))
#does not work
# tst_udf = udf(lambda l: split(l, 'bar'))
# df = df.withColumn("tst", tst_udf(col('list_event')))

-编辑
我的最终目标是旋转这个表,每个 session 有一行,在那里我有关于 'bar' 和(多个)'foo' 事件的变量。

最佳答案

尝试这个:
welcome to SO

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().orderBy("timestamp")
w2=Window().partitionBy("session_id").orderBy("timestamp")
w3=Window().partitionBy("session_id")
df.withColumn("timestamp", F.to_timestamp("timestamp", 'yyyy-MM-dd HH:mm:ss.SSS'))\
.withColumn("session_id", F.sum(F.when((F.col("event")=='bar'),F.lit(1))\
.otherwise(F.lit(0))).over(w))\
.withColumn("rowNum", F.row_number().over(w2))\
.withColumn("max", F.max("rowNum").over(w3))\
.withColumn("first", F.when((F.col("rowNum")==1)&(F.col("event")=='foo'), F.lit(1))\
.otherwise(F.lit(0)))\
.filter('max>=2 and first=0').drop(*['rowNum','sample_timestamp','max','first']).show()

#+----------+-------------------+-----+
#|session_id| timestamp|event|
#+----------+-------------------+-----+
#| 1|2020-01-01 12:31:00| bar|
#| 1|2020-01-01 12:32:00| foo|
#| 1|2020-01-01 12:33:00| foo|
#| 2|2020-01-01 13:00:00| bar|
#| 2|2020-01-01 13:01:00| foo|
#| 3|2020-01-01 13:02:00| bar|
#| 3|2020-01-01 13:03:00| foo|
#+----------+-------------------+-----+

关于python - 基于特定开始和结束事件的 Pyspark 拆分 session ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61825220/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com