gpt4 book ai didi

python - 根据列值的变化对 pyspark 数据框进行分区

转载 作者:太空宇宙 更新时间:2023-11-04 08:38:16 24 4
gpt4 key购买 nike

我在 pyspark 中有一个数据框。假设有一些列 a、b、c ...随着列值的变化,我想将数据分组。说

A  B
1 x
1 y
0 x
0 y
0 x
1 y
1 x
1 y

将有 3 个组作为 (1x,1y),(0x,0y,0x),(1y,1x,1y)以及对应的行数据

最佳答案

如果我理解正确,您希望在每次 A 列更改值时创建一个不同的组。

首先,我们将创建一个单调递增的 id 以保持行顺序不变:

import pyspark.sql.functions as psf
df = sc.parallelize([[1,'x'],[1,'y'],[0,'x'],[0,'y'],[0,'x'],[1,'y'],[1,'x'],[1,'y']])\
.toDF(['A', 'B'])\
.withColumn("rn", psf.monotonically_increasing_id())
df.show()

+---+---+----------+
| A| B| rn|
+---+---+----------+
| 1| x| 0|
| 1| y| 1|
| 0| x| 2|
| 0| y| 3|
| 0| x|8589934592|
| 1| y|8589934593|
| 1| x|8589934594|
| 1| y|8589934595|
+---+---+----------+

现在我们将使用一个窗口函数来创建一个包含 1 的列,每次 A 列发生变化时:

from pyspark.sql import Window
w = Window.orderBy('rn')
df = df.withColumn("changed", (df.A != psf.lag('A', 1, 0).over(w)).cast('int'))

+---+---+----------+-------+
| A| B| rn|changed|
+---+---+----------+-------+
| 1| x| 0| 1|
| 1| y| 1| 0|
| 0| x| 2| 1|
| 0| y| 3| 0|
| 0| x|8589934592| 0|
| 1| y|8589934593| 1|
| 1| x|8589934594| 0|
| 1| y|8589934595| 0|
+---+---+----------+-------+

最后我们将使用另一个窗口函数为每个组分配不同的数字:

df = df.withColumn("group_id", psf.sum("changed").over(w)).drop("rn").drop("changed")

+---+---+--------+
| A| B|group_id|
+---+---+--------+
| 1| x| 1|
| 1| y| 1|
| 0| x| 2|
| 0| y| 2|
| 0| x| 2|
| 1| y| 3|
| 1| x| 3|
| 1| y| 3|
+---+---+--------+

现在你可以建立你的群组了

关于python - 根据列值的变化对 pyspark 数据框进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47014555/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com