gpt4 book ai didi

scala - 将结构传递给 spark 中的 UDAF

转载 作者:可可西里 更新时间:2023-11-01 15:08:32 25 4
gpt4 key购买 nike

我有以下架构 -

root
|-- id:string (nullable = false)
|-- age: long (nullable = true)
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)
|-- name: string (nullable = true)

如何将结构“汽车”传递给 udaf?如果我只想传递 cars 子结构,inputSchema 应该是什么。

最佳答案

可以,但 UDAF 的逻辑会有所不同。例如,如果您有两行:

val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))

val rdd = spark.sparkContext.parallelize(seq)

这里是模式

root
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)

然后如果您尝试调用聚合:

val df = seq.toDF
df.agg(agg0(col("cars")))

您必须像这样更改您的 UDAF 输入模式:

val carsSchema =
StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))

在你的 UDAF 中,你必须处理这个改变 inputSchema 的模式:

override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)

在您的更新方法中,您必须处理输入行的格式:

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val i = input.getAs[Array[Array[String]]](0)
// i here would be [car1,car2,car3], an array of strings
buffer(0) = ???
}

从这里,您可以转换 i 以更新您的缓冲区并完成合并和评估功能。

关于scala - 将结构传递给 spark 中的 UDAF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54518102/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com