gpt4 book ai didi

scala - 如何编写一个简单地进行行收集的 Spark UDAF?

转载 作者:行者123 更新时间:2023-12-04 17:51:39 24 4
gpt4 key购买 nike

对于我的特定要求,我想编写一个 UDAF,它只是收集所有输入行。

输入是一个两列的行,Double Type;

“我认为”的中间模式是 ArrayList(如果我错了,请纠正我)

返回的数据类型是ArrayList

我写了一个关于我的 UDAF 的“想法”,但我希望有人帮助我完成它。

class CollectorUDAF() extends UserDefinedAggregateFunction {

// Input Data Type Schema
def inputSchema: StructType = StructType(Array(StructField("value", DoubleType), StructField("y", DoubleType)))

// Intermediate Schema
def bufferSchema = util.ArrayList[Array(StructField("value", DoubleType), StructField("y", DoubleType)]

// Returned Data Type .
def dataType: DataType = util.ArrayList[Array(StructField("value", DoubleType), StructField("y", DoubleType)]

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {

}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {


}

// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {

}

def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {

}

}

最佳答案

如果我理解您的问题是正确的,以下将是您的解决方案:

class CollectorUDAF() extends UserDefinedAggregateFunction {

// Input Data Type Schema
def inputSchema: StructType = new StructType().add("value", DataTypes.DoubleType).add("y", DataTypes.DoubleType)

// Intermediate Schema
val bufferFields : util.ArrayList[StructField] = new util.ArrayList[StructField]
val bufferStructField : StructField = DataTypes.createStructField("array", DataTypes.createArrayType(DataTypes.StringType, true), true)
bufferFields.add(bufferStructField)
def bufferSchema: StructType = DataTypes.createStructType(bufferFields)

// Returned Data Type .
def dataType: DataType = DataTypes.createArrayType(DataTypes.DoubleType)

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0, new java.util.ArrayList[Double])
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
val DoubleList: util.ArrayList[Double] = new util.ArrayList[Double](buffer.getList(0))
DoubleList.add(input.getDouble(0))
DoubleList.add(input.getDouble(1))
buffer.update(0, DoubleList)
}

def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getList(0).toArray() ++ buffer2.getList(0).toArray())
}
// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {
buffer.getList(0).toArray()
}
}

关于scala - 如何编写一个简单地进行行收集的 Spark UDAF?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42357013/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com