gpt4 book ai didi

arrays - spark udaf 更新数组类型的元组

转载 作者:行者123 更新时间:2023-12-01 13:41:45 26 4
gpt4 key购买 nike

我正在使用 Scala + Spark 2.0 并尝试编写一个 UDAF,该 UDAF 将元组数组作为其内部缓冲区以及返回类型:...

def bufferSchema = new StructType().add("midResults", ArrayType(  StructType(Array(StructField("a", DoubleType),StructField("b", DoubleType))) ))

def dataType: DataType = ArrayType( StructType(Array(StructField( "a", DoubleType),StructField("b", DoubleType))) )

这就是我更新缓冲区的方式

def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getAs[mutable.WrappedArray[(Double,Double)]](3) ++ Array((3.0,4.0))
}

但我得到以下异常:

java.lang.ArrayStoreException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

如果我有一个简单的 Double 数组,这种模式就有效。

最佳答案

java.lang.ArrayStoreException"thrown to indicate that an attempt has been made to store the wrong type of object into an array of objects"这是预期的 because a local Scala type for StructType is o.a.s.sql.Row不是元组。换句话说,您应该使用 Seq[Row] 作为缓冲区字段,使用 Row 作为值。

注意事项:

  • 在循环中调用 ++ 可能不是最好的主意。
  • 如果您认为创建 UDAF 有点过时,因为 Spark 2.0 collect_list 支持复杂类型。
  • 可以说是 AggregatorsUserDefinedAggregateFunctions 对用户更友好。

关于arrays - spark udaf 更新数组类型的元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39552845/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com