gpt4 book ai didi

python - 比较 Pyspark 中的列

转载 作者:太空狗 更新时间:2023-10-29 18:20:55 25 4
gpt4 key购买 nike

我正在处理一个包含 n 列的 PySpark DataFrame。我有一组 m 列 (m < n),我的任务是选择其中包含最大值的列。

例如:

输入:PySpark DataFrame 包含:

col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5]

输出:

col_4 = max(col1, col_2, col_3) = [3,2,5]

Pandas 中有类似的东西,如 this 中所述。问题。

在 PySpark 中有什么方法可以做到这一点,还是我应该更改将我的 PySpark df 转换为 Pandas df 然后执行操作?

最佳答案

您可以减少对列列表使用 SQL 表达式:

from pyspark.sql.functions import max as max_, col, when
from functools import reduce

def row_max(*cols):
return reduce(
lambda x, y: when(x > y, x).otherwise(y),
[col(c) if isinstance(c, str) else c for c in cols]
)

df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
.toDF(["a", "b", "c"]))

df.select(row_max("a", "b", "c").alias("max")))

Spark 1.5+ 还提供了least, greatest

from pyspark.sql.functions import greatest

df.select(greatest("a", "b", "c"))

如果你想保留最大值的名称,你可以使用`structs:

from pyspark.sql.functions import struct, lit

def row_max_with_name(*cols):
cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))

maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))

最后您可以使用上面的内容来查找选择的“顶部”列:

from pyspark.sql.functions import max

((_, c), ) = (maxs
.groupBy(col("maxs")["col"].alias("col"))
.count()
.agg(max(struct(col("count"), col("col"))))
.first())

df.select(c)

关于python - 比较 Pyspark 中的列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37673414/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com