gpt4 book ai didi

python - 使用 PySpark 基于行值模式对记录进行分组

转载 作者:行者123 更新时间:2023-12-03 22:55:14 27 4
gpt4 key购买 nike

我有一个包含 3 列的表:

Table A:

+----+----+----------+
|col1|col2|row_number|
+----+----+----------+
| X| 1| 1|
| Y| 0| 2|
| Z| 2| 3|
| A| 1| 4|
| B| 0| 5|
| C| 0| 6|
| D| 2| 7|
| P| 1| 8|
| Q| 2| 9|
+----+----+----------+

我想通过基于“col2”值对记录进行分组来连接“col1”中的字符串。
“col2”的模式为 1,后跟任意数量的 0,后跟 2。我想将“col2”以 1 开头并以 2 结尾的记录分组(必须保持数据帧的顺序 - 您可以使用订单的 row_number 列)

例如,前 3 条记录可以组合在一起,因为“col2”具有“1-0-2”。接下来的 4 条记录可以组合在一起,因为它们的“col2”值具有“1-0-0-2”

在我对这些记录进行分组后,可以使用“concat_ws”完成连接部分。但是关于如何根据“1-0s-2”模式对这些记录进行分组有什么帮助吗?

预期输出:
+----------+
|output_col|
+----------+
| XYZ|
| ABCD|
| PQ|
+----------+

您可以使用以下代码来创建此示例数据:
schema = StructType([StructField("col1", StringType())\
,StructField("col2", IntegerType())\
,StructField("row_number", IntegerType())])

data = [['X', 1, 1], ['Y', 0, 2], ['Z', 2, 3], ['A', 1, 4], ['B', 0, 5], ['C', 0, 6], ['D', 2, 7], ['P', 1, 8], ['Q', 2, 9]]

df = spark.createDataFrame(data,schema=schema)
df.show()

最佳答案

我建议你使用 window职能。首先使用由row_number订购的窗口获得 增量和col2 . incremental sum将有 3 的倍数 这将基本上是 端点 您需要的组 .将它们替换为 同一窗口的滞后,获取您的 所需的分区 incremental_sum .现在您可以 groupBy incremental_sum列和 collect_list .您可以 array_join ( spark2.4 ) 在收集的列表中,以获得您想要的字符串。

from pyspark.sql import functions as F 
from pyspark.sql.window import Window
w=Window().orderBy("row_number")
df.withColumn("incremental_sum", F.sum("col2").over(w))\
.withColumn("lag", F.lag("incremental_sum").over(w))\
.withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
.groupBy("incremental_sum").agg(F.array_join(F.collect_list("col1"),"").alias("output_col")).drop("incremental_sum").show()
+----------+
|output_col|
+----------+
| XYZ|
| ABCD|
| PQ|
+----------+

关于python - 使用 PySpark 基于行值模式对记录进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60980308/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com