gpt4 book ai didi

group-by - 在 Pyspark 中展平分组

转载 作者:行者123 更新时间:2023-12-02 05:47:34 26 4
gpt4 key购买 nike

我有一个 pyspark 数据框。例如,

d= hiveContext.createDataFrame([("A", 1), ("B", 2), ("D", 3), ("D", 3),  ("A", 4), ("D", 3)],["Col1", "Col2"])

+----+----+
|Col1|Col2|
+----+----+
| A| 1|
| B| 2|
| D| 3|
| D| 3|
| A| 4|
| D| 3|
+----+----+

我想按 Col1 分组,然后创建一个 Col2 列表。我需要展平这些团体。我确实有很多专栏。

+----+----------+
|Col1| Col2|
+----+----------+
| A| [1,4] |
| B| [2] |
| D| [3,3,3]|
+----+----------+

最佳答案

您可以执行 groupBy() 并使用 collect_list() 作为聚合函数:

import pyspark.sql.functions as f
d.groupBy('Col1').agg(f.collect_list('Col2').alias('Col2')).show()
#+----+---------+
#|Col1| Col2|
#+----+---------+
#| B| [2]|
#| D|[3, 3, 3]|
#| A| [1, 4]|
#+----+---------+

更新

如果要合并多个列,可以对每个列使用 collect_list(),然后使用 struct()udf( )。考虑以下示例:

创建虚拟数据

from operator import add
import pyspark.sql.functions as f

# create example dataframe
d = sqlcx.createDataFrame(
[
("A", 1, 10),
("B", 2, 20),
("D", 3, 30),
("D", 3, 10),
("A", 4, 20),
("D", 3, 30)
],
["Col1", "Col2", "Col3"]
)

将所需的列收集到列表中

假设您有一个要收集到列表中的列列表。您可以执行以下操作:

cols_to_combine = ['Col2', 'Col3']
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine]).show()
#+----+---------+------------+
#|Col1| Col2| Col3|
#+----+---------+------------+
#| B| [2]| [20]|
#| D|[3, 3, 3]|[30, 10, 30]|
#| A| [4, 1]| [20, 10]|
#+----+---------+------------+

将结果列表合并为一列

现在我们要将列表列合并为一个列表。如果我们使用 struct(),我们将得到以下内容:

d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
.select('Col1', f.struct(*cols_to_combine).alias('Combined'))\
.show(truncate=False)
#+----+------------------------------------------------+
#|Col1|Combined |
#+----+------------------------------------------------+
#|B |[WrappedArray(2),WrappedArray(20)] |
#|D |[WrappedArray(3, 3, 3),WrappedArray(10, 30, 30)]|
#|A |[WrappedArray(1, 4),WrappedArray(10, 20)] |
#+----+------------------------------------------------+

展平环绕数组

快到了。我们只需要组合 WrappedArray。我们可以使用 udf() 实现这一点:

combine_wrapped_arrays = f.udf(lambda val: reduce(add, val), ArrayType(IntegerType()))
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
.select('Col1', combine_wrapped_arrays(f.struct(*cols_to_combine)).alias('Combined'))\
.show(truncate=False)
#+----+---------------------+
#|Col1|Combined |
#+----+---------------------+
#|B |[2, 20] |
#|D |[3, 3, 3, 30, 10, 30]|
#|A |[1, 4, 10, 20] |
#+----+---------------------+

引用资料


更新2

一种更简单的方法,无需处理 WrappedArray:

from operator import add

combine_udf = lambda cols: f.udf(
lambda *args: reduce(add, args),
ArrayType(IntegerType())
)

d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
.select('Col1', combine_udf(cols_to_combine)(*cols_to_combine).alias('Combined'))\
.show(truncate=False)
#+----+---------------------+
#|Col1|Combined |
#+----+---------------------+
#|B |[2, 20] |
#|D |[3, 3, 3, 30, 10, 30]|
#|A |[1, 4, 10, 20] |
#+----+---------------------+

注意:最后一步仅在所有列的数据类型都相同时才有效。您不能使用此函数将包装数组与混合类型组合在一起。

关于group-by - 在 Pyspark 中展平分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48625917/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com