gpt4 book ai didi

python - 将最大值时间戳放入 PySpark 的数组中

转载 作者:太空宇宙 更新时间:2023-11-04 01:45:12 25 4
gpt4 key购买 nike

我有一个包含以下列的 PySpark 数据框(比如 df1)

1.> category - 包含独特的类别类型

2.> start_time_array - 按升序排列的时间戳数组

3.> end_time_array - 按升序排列的时间戳数组

4.> lenStart - start_time_array 中的数组长度

5.> lenEnd - end_time_array 中数组的长度

下面是df1的一个例子:

+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B| [2017-02-18 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 1|
+--------+------------------------------------------+------------------------------------------+--------+------+

还有另一个数据框df2,它包含两列categorytimestampdf2 包含与 df1 相同的 category 值,并且 df1 中数组内时间戳的值是df2 中的时间戳。以下是 df2

的示例
+--------+-------------------+
|category| timestamp|
+--------+-------------------+
| A|2017-01-16 00:00:00|
| A|2017-01-18 00:00:00|
| A|2017-01-25 00:00:00|
| A|2017-01-27 00:00:00|
| B|2017-02-14 00:00:00|
| B|2017-02-18 00:00:00|
| B|2017-02-21 00:00:00|
| B|2017-02-22 00:00:00|
| B|2017-02-24 00:00:00|
| B|2017-02-25 00:00:00|
+--------+-------------------+

正如我们在上面的 df1 示例中看到的,对于 category -> BlenStart=2 不等于 lenEnd=1。在 df1 的所有行中,lenStart = lenEndlenStart = lenEnd+1 对于 df1 中的所有行 其中 lenStart = lenEnd+1,我想取 timestamp 的最大值(属于适当的 category)并将其放在end_time_array 中的数组。我该怎么做?

以下是使用 df2 中的信息处理 df1 后的预期输出

+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B|[2017-02-18 00:00:00, 2017-02-25 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 2|
+--------+------------------------------------------+------------------------------------------+--------+------+

最佳答案

这应该适用于 Spark 1.5+:

import pyspark.sql.functions as F
df3 = df1.where(F.col('lenStart') == (F.col('lenEnd') + 1)).select('category')
df4 = df2.join(df3, 'Category').groupby('Category').agg(F.max('timestamp').alias('max'))
df5 = df1.join(df4, 'Category', 'left')
df1_changed = df5.withColumn('end_time_array', F.when(F.col('max').isNull(),
F.col('end_time_array')).otherwise(F.concat(F.col('end_time_array'),
F.array(F.col('max')))))
df1_changed = df1_changed.withColumn('lenEnd', F.size(F.col('end_time_array')))

df1_changed 将有一个修改过的 end_time_array 列,当您请求的条件适用时,它会添加所需的值,否则,它保持不变。

关于python - 将最大值时间戳放入 PySpark 的数组中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59224934/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com