gpt4 book ai didi

scala - Spark UDAF - 使用泛型作为输入类型?

转载 作者:行者123 更新时间:2023-12-01 08:47:53 28 4
gpt4 key购买 nike

我想编写 Spark UDAF,其中列的类型可以是任何定义了 Scala Numeric 的类型。我在 Internet 上搜索过,但只找到了具体类型的示例,例如 DoubleTypeLongType。这不可能吗?但是,如何将该 UDAF 与其他数值一起使用呢?

最佳答案

为简单起见,我们假设您要定义自定义 sum。您将为输入类型提供 TypeTag 并使用 Scala 反射来定义模式:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import scala.reflect.runtime.universe._
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor

case class MySum [T : TypeTag](implicit n: Numeric[T])
extends UserDefinedAggregateFunction {

val dt = schemaFor[T].dataType
def inputSchema = new StructType().add("x", dt)
def bufferSchema = new StructType().add("x", dt)

def dataType = dt
def deterministic = true

def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, n.zero)
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, n.plus(buffer.getAs[T](0), input.getAs[T](0)))
}

def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, n.plus(buffer1.getAs[T](0), buffer2.getAs[T](0)))
}

def evaluate(buffer: Row) = buffer.getAs[T](0)
}

使用上面定义的函数,我们可以创建处理特定类型的实例:

val sumOfLong = MySum[Long]
spark.range(10).select(sumOfLong($"id")).show
+---------+
|mysum(id)|
+---------+
| 45|
+---------+

注意:

要获得与内置聚合函数相同的灵 active ,您必须定义自己的 AggregateFunction,例如 ImperativeAggregateDeclarativeAggregate .这是可能的,但它是一个内部 API。

关于scala - Spark UDAF - 使用泛型作为输入类型?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43248719/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com