gpt4 book ai didi

scala - UDAF Spark 中的多列输出

转载 作者:行者123 更新时间:2023-12-02 04:35:58 25 4
gpt4 key购买 nike

我从我的 mongodb 中获取了一些数据,如下所示:

     +------+-------+
| view | data |
+------+-------+
| xx | *** |
| yy | *** |
| xx | *** |
+------+-------+

其实没必要知道里面是什么。

我写了一个像这样的 UserDefinedAggregateFunction 因为我想在 View 上分组。:

class Extractor() extends UserDefinedAggregateFunction{
override def inputSchema: StructType = // some stuff

override def bufferSchema: StructType =
StructType(
List(
StructField("0",IntegerType,false),
StructField("1",IntegerType,false),
StructField("2",IntegerType,false),
StructField("3",IntegerType,false),
StructField("4",IntegerType,false),
StructField("5",IntegerType,false),
StructField("6",IntegerType,false),
StructField("7",IntegerType,false)
)
)

override def dataType: DataType = bufferSchema

override def deterministic: Boolean = true

override def initialize(buffer: MutableAggregationBuffer): Unit = {
for (x <- 0 to 7){
buffer(x) = 0
}
}

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = // some stuff

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = // some stuff

override def evaluate(buffer: Row): Any =
var l = List.empty[Integer]
for (x <- 7 to 0 by -1){
l = buffer.getInt(x) :: l
}
l
}

我的输出应该是这样的:

     +------+---+---+---+---+---+---+---+---+
| view | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+------+---+---+---+---+---+---+---+---+
| xx | 0 | 0 | 4 | 1 | 0 | 0 | 3 | 0 |
| yy | 0 | 0 | 0 | 3 | 0 | 1 | 0 | 0 |
+------+---+---+---+---+---+---+---+---+

这些值是在上面的更新/合并函数中计算出来的,但它是有效的,没有必要让你看到它。

然后我这样使用它:

val ex = new Extractor()
val df = dataset.groupBy("view").agg(
ex(dataset.col("data"))
)
df.show()

当我执行 df.show() 时,它总是给我一个 IndexOutOfBoundException。我知道这是惰性求值,这就是我在 df.show() 中出错的原因。

据我所知,它可以执行第一组并结束评估功能。但在那之后我得到一个 IndexOutOfBoundException ......

此外,当我更改数据类型并将函数计算为:

override def dataType: DataType =
ArrayType(IntegerType,false)

override def evaluate(buffer: Row): Any = {
var l = ofDim[Integer](8)
for (x <- 0 to 7){
l(x) = buffer.getInt(x)
}
l

输出看起来像这样:

     +------+------------------------------+
| view | Extractor |
+------+------------------------------+
| xx | [0, 0, 4, 1, 0, 0, 3, 0] |
| yy | [0, 0, 0, 3, 0, 1, 0, 0] |
+------+------------------------------+

模式看起来像这样:

root
|-- view: string (nullable = true)
|-- Extractor: array (nullable = true)
| |-- element: integer (containsNull = false)

而且我无法将其转换为我想要的形式。

因为第二种方法有效,我想我在第一种方法中搞砸了 DataType 的一些东西,但我不知道如何修复它......

很多关于我的问题的介绍:

如何获得我想要的输出?我真的不在乎这两种方法中的哪一种(首先使用多个输出列或一个可以转换为我想要的形式的数组),只要它是有效的即可。

谢谢你的帮助

最佳答案

您正在将聚合输出定义为列表:

 override def dataType: DataType = bufferSchema

因为bufferSchema是一个List,所以最后得到的就是这个。您可以稍后更改架构并将列表中的每一列转换为新列。

对于你的错误,两者的区别:

override def evaluate(buffer: Row): Any = 
var l = List.empty[Integer]
for (x <- 7 to 0 by -1){
l = buffer.getInt(x) :: l
}
l

override def evaluate(buffer: Row): Any = 
var l = ofDim[Integer](8)
for (x <- 0 to 7){
l = buffer.getInt(x) :: l
}
l

是在第二个中,您定义了预定义的列数。因此,您确定可以毫无问题地从 0 迭代到 7。

您的第一个示例不是这种情况,因此,我怀疑您可能有错误格式的数据,导致您的缓冲区在 initializemerge 中被错误地初始化。我会建议你添加一个 try/catch 来验证每一步转换缓冲区长度后的大小(至少 initialize 但可以是 update也合并)。

要为列表中的每个元素添加列,您可以使用 withColumn 或通过 map 来完成。

关于scala - UDAF Spark 中的多列输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42749155/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com