gpt4 book ai didi

apache-spark - Spark 数据帧同一列上的多个聚合操作

转载 作者:行者123 更新时间:2023-12-03 07:27:19 25 4
gpt4 key购买 nike

我有三个字符串类型的数组,其中包含以下信息:

  • groupBy 数组:包含我想要对数据进行分组的列的名称。
  • 聚合数组:包含我要聚合的列的名称。
  • 操作数组:包含我要执行的聚合操作

我正在尝试使用 Spark 数据帧来实现此目的。 Spark数据框架提供了一个agg(),您可以在其中传递Map [String,String](列名和相应的聚合操作)作为输入,但是我想对数据的同一列执行不同的聚合操作。关于如何实现这一目标有什么建议吗?

最佳答案

斯卡拉:

例如,您可以使用定义的从名称到函数的映射来映射函数列表:

import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
"min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// | k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// | 1| 3.0| 3.0| 3.0|
// | 2| -5.0| -5.0| -5.0|
// +---+------+------+------+

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show

不幸的是,内部使用的解析器SQLContext并未公开公开,但您始终可以尝试构建普通的 SQL 查询:

df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")

Python:

from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"]
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

另请参阅:

关于apache-spark - Spark 数据帧同一列上的多个聚合操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34954771/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com