gpt4 book ai didi

apache-spark - 如何在时间戳值上使用lag和rangeBetween函数?

转载 作者:行者123 更新时间:2023-12-03 15:53:29 31 4
gpt4 key购买 nike

我有看起来像这样的数据:

userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769

我想添加一个新的 bool 列,如果在同一 userid的5分钟窗口内有2个或更多 location_point,则标记为true。我有一个想法,使用 lag函数在由 userid划分的窗口中查找,该窗口的范围介于当前时间戳和接下来的5分钟之间:

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col

days = lambda i: i * 60*5

windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))

lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")

当我使用RangeBetween函数时,此代码为我提供了一个分析异常:

AnalysisException: u'Window Frame RANGE BETWEEN CURRENT ROW AND 1500 FOLLOWING must match the required frame ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING;



您知道解决这个问题的任何方法吗?

最佳答案

根据您的数据:

让我们添加一个带有时间戳(以秒为单位)的列:

df = df.withColumn('timestamp',df_taf.eventtime.astype('Timestamp').cast("long"))
df.show()

+--------+-------------------+--------------+----------+
| userid| eventtime|location_point| timestamp|
+--------+-------------------+--------------+----------+
|4e191908|2017-06-04 03:00:00| 18685891|1496545200|
|4e191908|2017-06-04 03:04:00| 18685891|1496545440|
|3136afcb|2017-06-04 03:03:00| 18382821|1496545380|
|661212dd|2017-06-04 03:06:00| 80831484|1496545560|
|40e8a7c3|2017-06-04 03:12:00| 18825769|1496545920|
|4e191908|2017-06-04 03:11:30| 18685891|1496545890|
+--------+-------------------+--------------+----------+

现在,让我们定义一个窗口函数,按location_point进行分区,按timestamp进行排序,以及-300s和当前时间之间的范围。我们可以计算该窗口中元素的数量,并将这些数据放在名为“occurrences in_5_min”的列中:

w = Window.partitionBy('location_point').orderBy('timestamp').rangeBetween(-60*5,0)
df = df.withColumn('occurrences_in_5_min',F.count('timestamp').over(w))
df.show()

+--------+-------------------+--------------+----------+--------------------+
| userid| eventtime|location_point| timestamp|occurrences_in_5_min|
+--------+-------------------+--------------+----------+--------------------+
|40e8a7c3|2017-06-04 03:12:00| 18825769|1496545920| 1|
|3136afcb|2017-06-04 03:03:00| 18382821|1496545380| 1|
|661212dd|2017-06-04 03:06:00| 80831484|1496545560| 1|
|4e191908|2017-06-04 03:00:00| 18685891|1496545200| 1|
|4e191908|2017-06-04 03:04:00| 18685891|1496545440| 2|
|4e191908|2017-06-04 03:11:30| 18685891|1496545890| 1|
+--------+-------------------+--------------+----------+--------------------+

现在,如果在特定位置最近5分钟内发生的次数严格大于1,则可以用True添加所需的列:

add_bool = udf(lambda col : True if col>1 else False, BooleanType())
df = df.withColumn('already_occured',add_bool('occurrences_in_5_min'))
df.show()

+--------+-------------------+--------------+----------+--------------------+---------------+
| userid| eventtime|location_point| timestamp|occurrences_in_5_min|already_occured|
+--------+-------------------+--------------+----------+--------------------+---------------+
|40e8a7c3|2017-06-04 03:12:00| 18825769|1496545920| 1| false|
|3136afcb|2017-06-04 03:03:00| 18382821|1496545380| 1| false|
|661212dd|2017-06-04 03:06:00| 80831484|1496545560| 1| false|
|4e191908|2017-06-04 03:00:00| 18685891|1496545200| 1| false|
|4e191908|2017-06-04 03:04:00| 18685891|1496545440| 2| true|
|4e191908|2017-06-04 03:11:30| 18685891|1496545890| 1| false|
+--------+-------------------+--------------+----------+--------------------+---------------+

关于apache-spark - 如何在时间戳值上使用lag和rangeBetween函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45531662/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com