gpt4 book ai didi

apache-spark - 在多个列上应用自定义 Spark 聚合器 (Spark 2.0)

转载 作者:行者123 更新时间:2023-12-03 23:38:15 27 4
gpt4 key购买 nike

我创建了一个自定义 Aggregator[]对于字符串。

我想将它应用于 DataFrame 的所有列其中所有列都是字符串,但列号是任意的。

我坚持写正确的表达方式。我想写这样的东西:

df.agg( df.columns.map( c => myagg(df(c)) ) : _*) 

鉴于各种接口(interface),这显然是错误的。

我看过 RelationalGroupedDataset.agg(expr: Column, exprs: Column*)代码,但我不熟悉表达式操作。

任何的想法 ?

最佳答案

UserDefinedAggregateFunctions 相比,对单个字段(列)进行操作,Aggregtors期待完整的 Row/值(value)。

如果你想和Aggregator可以在您的代码段中使用它必须由列名参数化并使用 Row作为值类型。

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, Row}

case class Max(col: String)
extends Aggregator[Row, Int, Int] with Serializable {

def zero = Int.MinValue
def reduce(acc: Int, x: Row) =
Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero))

def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2)
def finish(acc: Int) = acc

def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

示例用法:

val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z")

@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)"))

df.agg(exprs.head, exprs.tail: _*)

+------+------+------+
|max(x)|max(y)|max(z)|
+------+------+------+
| 4| 5| 3|
+------+------+------+

可以说 Aggregators与静态类型 Datasets 结合使用时更有意义比 Dataset<Row> .

根据您的要求,您还可以使用 Seq[_] 一次聚合多个列。累加器和处理一个整体 Row (记录)在单个 merge称呼。

关于apache-spark - 在多个列上应用自定义 Spark 聚合器 (Spark 2.0),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41483774/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com