gpt4 book ai didi

scala - 如何更改 DataFrame 的模式(修复一些嵌套字段的名称)?

转载 作者:行者123 更新时间:2023-12-01 10:25:02 24 4
gpt4 key购买 nike

我有一个问题,当我们将 Json 文件加载到 Spark 中时,将其存储为 Parquet,然后尝试从 Impala 访问 Parquet 文件; Impala 提示列的名称,因为它们包含在 SQL 中非法的字符。

JSON 文件的“特征”之一是它们没有预定义的架构。我想让 Spark 创建模式,然后我必须修改具有非法字符的字段名称。

我的第一个想法是对 DataFrame 中的字段名称使用 withColumnRenamed ,但我认为这只适用于顶级字段,所以我不能使用它,因为 Json 包含嵌套数据。

所以我创建了以下代码来重新创建 DataFrames 模式,递归地遍历结构。然后我使用该新架构重新创建 DataFrame。

(代码根据 Jacek 建议的使用 Scala 复制构造函数的改进进行了更新。)

def replaceIllegal(s: String): String = s.replace("-", "_").replace("&", "_").replace("\"", "_").replace("[", "_").replace("[", "_")
def removeIllegalCharsInColumnNames(schema: StructType): StructType = {
StructType(schema.fields.map { field =>
field.dataType match {
case struct: StructType =>
field.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(struct))
case _ =>
field.copy(name = replaceIllegal(field.name))
}
})
}

sparkSession.createDataFrame(df.rdd, removeIllegalCharsInColumnNames(df.schema))

这行得通。但是有没有更好/更简单的方法来实现我想做的事情?

有没有更好的方法来替换 DataFrame 上的现有模式?以下代码无效:

df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema)))

它给出了这个错误:

org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'cast'

最佳答案

我认为最好的办法是将数据集(在保存为 parquet 文件之前)转换为 RDD,并根据需要使用自定义模式来描述结构。

val targetSchema: StructType = ...
val fromJson: DataFrame = ...
val targetDataset = spark.createDataFrame(fromJson.rdd, targetSchema)

请参阅 SparkSession.createDataFrame 中的示例作为引用,但是当您要从数据集创建它时,它直接使用 RDD。

val schema =
StructType(
StructField("name", StringType, false) ::
StructField("age", IntegerType, true) :: Nil)

val people =
sc.textFile("examples/src/main/resources/people.txt").map(
_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val dataFrame = sparkSession.createDataFrame(people, schema)
dataFrame.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

但是正如您在评论中提到的(我后来将其合并到您的问题中):

JSON files don't have a predefined schema.

话虽如此,我认为您的解决方案是正确的。 Spark 不提供开箱即用的任何类似功能,我认为更多的是开发自定义 Scala 代码,该代码将遍历 StructType/StructField 树并更改不正确的内容。

我建议在您的代码中更改的是使用 copy 构造函数(Scala 的案例类的一个功能 - 参见 A Scala case class ‘copy’ method example ),它只会更改不正确的名称而其他属性保持不变.

使用 copy 构造函数将(大致)对应于以下代码:

// was
// case s: StructType =>
// StructField(replaceIllegal(field.name), removeIllegalCharsInColumnNames(s), field.nullable, field.metadata)
s.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(s))

函数式语言(一般而言)和 Scala(特别是)中有一些设计模式可以处理深层嵌套结构操作,但这可能太多了(我犹豫要不要分享)。

因此,我认为问题的当前“形式”更多是关于如何将树作为数据结构而不一定是 Spark 模式来操作。

关于scala - 如何更改 DataFrame 的模式(修复一些嵌套字段的名称)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45218373/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com