gpt4 book ai didi

apache-spark - 两个整数之间的差异返回 None 与 PySpark?

转载 作者:行者123 更新时间:2023-12-02 04:36:42 26 4
gpt4 key购买 nike

我正在尝试在我的 spark 数据中集成一个由滚动时间窗口定义的“用户 session ”。

我一直在使用这个问题:How to aggregate over rolling time window with groups in Spark

我的不同之处在于我希望我的时间窗口大约为 5 小时,所以我不能使用会返回天数的 datediff。

这是我的数据集:

[Row(auction_id_64=9999, datetime=datetime.datetime(2016, 12, 5, 3, 42, 17), user_id_64=123),
Row(auction_id_64=8888, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=123),
Row(auction_id_64=5555, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=123),
Row(auction_id_64=4444, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=456),
Row(auction_id_64=3333, datetime=datetime.datetime(2016, 12, 7, 3, 40, 54), user_id_64=456),
Row(auction_id_64=7777, datetime=datetime.datetime(2016, 12, 7, 18, 42, 17), user_id_64=456),
Row(auction_id_64=6666, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=789),
Row(auction_id_64=2222, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=789),
Row(auction_id_64=1111, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=789),
Row(auction_id_64=1212, datetime=datetime.datetime(2016, 12, 9, 3, 40, 54), user_id_64=789)]

我所需要的只是添加一个列,该列将按用户对 session 进行索引。 (比如auction_id 9999是session 0,auction_id 8888和auction_id 5555是session 1(因为9999到8888之间有很多天,而8888到5555之间只有几分钟)。而接下来我们从0开始索引用户。

这是我的代码:

# Add a timestamp (integer) column
df = df.withColumn('timestamp', unix_timestamp(df['datetime']).cast('integer'))

# We partition by user and order by timestamp
w = Window.partitionBy("user_id_64").orderBy("timestamp")

# we compute the absolute difference between timestamp and timestamp from the previous line. If no result, 0 is returned.
diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))

# If difference higher than 5 hours
indicator = (diff > 5 * 60 * 60).cast("integer")

# We increment for each indicator = 1
subgroup = sum(test).over(w).alias("session_index")

# We get everything
df = df.select("*", subgroup)

最后,每个人的session_index都是0。问题来自行 diff = coalesce(abs("timestamp"- lag("timestamp", 1).over(w)), lit(0))。在这里,每次返回的是 lit(0) (我通过更改 0 值进行检查)。所以我尝试通过更改几行来简化我的脚本:

test = "timestamp" - lag("timestamp", 1).over(w)
subgroup = sum(test).over(w).alias("session_index")

我删除了 coalesce 和 abs 函数。每行的 session_index 都是“无”。

如果我用 test = "timestamp" 替换 test,这会很好:我会得到时间戳的累加和。

如果我用 test = lag("timestamp", 1).over(w) 替换它,它也会很好,我会在用户的第一行得到 None (因为没有前一行),然后是累计和。

当我尝试减去我的两个整数时,问题就来了。但我不明白为什么?它是两个整数,结果也应该是一个整数,不是吗?

感谢您能为我提供的任何帮助。

最佳答案

如果它是两个整数之间的差值会很奇怪,但事实并非如此。让我们再看看罪魁祸首:

coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))

用于减法的左侧操作数是一个 strstr 没有可以对Column 进行操作的__sub__,所以我们使用右边操作数的__rsub__。通常,Column 的双下划线方法将标准 Python 类型解释为文字。所以你的代码实际上试图从字符串“timestamp”中减去整数,结果是未定义的。

TL;DR 您应该使用 Column 作为左侧操作数:

from pyspark.sql.functions import col

coalesce(abs(col("timestamp") - lag("timestamp", 1).over(w)), lit(0))

关于apache-spark - 两个整数之间的差异返回 None 与 PySpark?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41850761/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com