gpt4 book ai didi

scala - 如何使用 udf 将空列添加到 Spark 中的复杂数组结构

转载 作者:行者123 更新时间:2023-12-02 19:56:22 25 4
gpt4 key购买 nike

我正在尝试将空列添加到嵌入数组[struct]列,通过这种方式我将能够转换类似的复杂列:

  case class Additional(id: String, item_value: String)
case class Element(income:String,currency:String,additional: Additional)
case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income:String,currency:String,additional: Additional2)

val my_uDF = fx.udf((data: Seq[Element]) => {
data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
})
sparkSession.sqlContext.udf.register("transformElements",my_uDF)
val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

目标是向 Element.Additional 添加一个名为 extra2 的额外字段,因此我使用 UDF 映射该字段,但它失败了,因为:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

如果我打印“元素”字段的架构,则会显示:

 |-- myElements: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- income: string (nullable = true)
| | |-- currency: string (nullable = true)
| | |-- additional: struct (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- item_value: string (nullable = true)

我正在尝试转换成这个模式:

 |-- myElements: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- income: string (nullable = true)
| | |-- currency: string (nullable = true)
| | |-- additional: struct (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- item_value: string (nullable = true)
| | | |-- extra2: string (nullable = true)

最佳答案

这是另一种方法,利用数据集而不是数据帧来实现对对象的直接访问,而不是使用 Row。有一种名为 asElement2 的额外方法,可将 Element 转换为 Element2

case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional2: Additional2)

case class Additional(id: String, item_value: String)
case class Element(income:String, currency:String, additional: Additional){
def asElement2(): Element2 ={
val additional2 = Additional2(additional.id, additional.item_value, null)
Element2(income, currency, additional2)
}
}

val df = Seq(
(Seq(Element("150000", "EUR", Additional("001", "500EUR")))),
(Seq(Element("50000", "CHF", Additional("002", "1000CHF"))))
).toDS()

df.map{
se => se.map{_.asElement2}
}

//or even simpler
df.map{_.map{_.asElement2}}

输出:

+-------------------------------+
|value |
+-------------------------------+
|[[150000, EUR, [001, 500EUR,]]]|
|[[50000, CHF, [002, 1000CHF,]]]|
+-------------------------------+

最终架构:

root
|-- value: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- income: string (nullable = true)
| | |-- currency: string (nullable = true)
| | |-- additional2: struct (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- item_value: string (nullable = true)
| | | |-- extra2: string (nullable = true)

关于scala - 如何使用 udf 将空列添加到 Spark 中的复杂数组结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56942683/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com