gpt4 book ai didi

scala - 从结构元素的嵌套数组创建一个 Spark DataFrame?

转载 作者:行者123 更新时间:2023-12-04 17:38:58 30 4
gpt4 key购买 nike

我已将一个 JSON 文件读入 Spark。该文件具有以下结构:

   root
|-- engagement: struct (nullable = true)
| |-- engagementItems: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- availabilityEngagement: struct (nullable = true)
| | | | |-- dimapraUnit: struct (nullable = true)
| | | | | |-- code: string (nullable = true)
| | | | | |-- constrained: boolean (nullable = true)
| | | | | |-- id: long (nullable = true)
| | | | | |-- label: string (nullable = true)
| | | | | |-- ranking: long (nullable = true)
| | | | | |-- type: string (nullable = true)
| | | | | |-- version: long (nullable = true)
| | | | | |-- visible: boolean (nullable = true)

我创建了一个递归函数,用嵌套 StructType 的列来展平架构

def flattenSchema(schema: StructType, prefix: String = null):Array[Column]= 
{
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)

f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName).alias(colName))
}
})
}

val newDF=SIWINSDF.select(flattenSchema(SIWINSDF.schema):_*)

val secondDF=newDF.toDF(newDF.columns.map(_.replace(".", "_")): _*)

我如何展平包含嵌套 StructType 的 ArrayType,例如 engagementItems: array (nullable = true)

感谢任何帮助。

最佳答案

这里的问题是您需要管理 ArrayType 的大小写,并在将其转换为 StructType 之后。因此,您可以为此使用 Scala 运行时转换。

首先,我生成了下一个场景(顺便说一句,将其包含在您的问题中会非常有帮助,因为这样可以更轻松地重现问题):

  case class DimapraUnit(code: String, constrained: Boolean, id: Long, label: String, ranking: Long, _type: String, version: Long, visible: Boolean)
case class AvailabilityEngagement(dimapraUnit: DimapraUnit)
case class Element(availabilityEngagement: AvailabilityEngagement)
case class Engagement(engagementItems: Array[Element])
case class root(engagement: Engagement)
def getSchema(): StructType ={
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[root].dataType.asInstanceOf[StructType]

schema.printTreeString()
schema
}

这将打印出:

root
|-- engagement: struct (nullable = true)
| |-- engagementItems: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- availabilityEngagement: struct (nullable = true)
| | | | |-- dimapraUnit: struct (nullable = true)
| | | | | |-- code: string (nullable = true)
| | | | | |-- constrained: boolean (nullable = false)
| | | | | |-- id: long (nullable = false)
| | | | | |-- label: string (nullable = true)
| | | | | |-- ranking: long (nullable = false)
| | | | | |-- _type: string (nullable = true)
| | | | | |-- version: long (nullable = false)
| | | | | |-- visible: boolean (nullable = false)

然后我通过添加额外的 ArrayType 检查并使用 asInstanceOf 将其转换为 StructType 来修改您的函数:

  import org.apache.spark.sql.types._  
def flattenSchema(schema: StructType, prefix: String = null):Array[Column]=
{
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)

f.dataType match {
case st: StructType => flattenSchema(st, colName)
case at: ArrayType =>
val st = at.elementType.asInstanceOf[StructType]
flattenSchema(st, colName)
case _ => Array(new Column(colName).alias(colName))
}
})
}

最后是结果:

val s = getSchema()
val res = flattenSchema(s)

res.foreach(println(_))

输出:

engagement.engagementItems.availabilityEngagement.dimapraUnit.code AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.code`
engagement.engagementItems.availabilityEngagement.dimapraUnit.constrained AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.constrained`
engagement.engagementItems.availabilityEngagement.dimapraUnit.id AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.id`
engagement.engagementItems.availabilityEngagement.dimapraUnit.label AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.label`
engagement.engagementItems.availabilityEngagement.dimapraUnit.ranking AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.ranking`
engagement.engagementItems.availabilityEngagement.dimapraUnit._type AS `engagement.engagementItems.availabilityEngagement.dimapraUnit._type`
engagement.engagementItems.availabilityEngagement.dimapraUnit.version AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.version`
engagement.engagementItems.availabilityEngagement.dimapraUnit.visible AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.visible`

关于scala - 从结构元素的嵌套数组创建一个 Spark DataFrame?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55398372/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com