gpt4 book ai didi

json - Spark 默认空列数据集

转载 作者:行者123 更新时间:2023-12-04 02:04:08 24 4
gpt4 key购买 nike

我无法让 Spark 将 json(或 csv)读取为具有 Dataset 字段的案例类的 Option[_],其中并非所有字段都在源中定义。

这有点神秘,但假设我有一个名为 CustomData 的案例类

给定以下 json 文件 ( customA.json ):

{"id":123, "colA": "x", "colB": "z"}
{"id":456, "colA": "y"}
{"id":789, "colB": "a"}

以及以下代码:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()

import spark.implicits._

case class CustomData(id: BigInt, colA: Option[String], colB: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

val ds = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customA.json")
.as[CustomData]
.show()

输出是 - 正如预期的 - :
+----+----+---+
|colA|colB| id|
+----+----+---+
| x| z|123|
| y|null|456|
|null| a|789|
+----+----+---+

即使并非总是定义所有列。
但是,如果我想使用相同的代码来读取其中一列无处出现的文件,我无法实现:

对于另一个 json 文件( customB.json ):

{"id":321, "colA": "x"}
{"id":654, "colA": "y"}
{"id":987}

以及附加代码:

  val ds2 = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customB.json")
.as[CustomData]
.show()

输出是一个错误:

org.apache.spark.sql.AnalysisException:无法解析“colB”给定的输入列:[colA,id];

这是有道理的,但我很想为两个文件重用相同的案例类。特别是如果我不知道 colB 是否在摄取之前出现在 json 文件中。

当然我可以进行检查,但是有没有办法将不存在的列转换为 null (与 customA.json 一样)。将 readmode 设置为 Permissive 似乎没有任何改变。

我错过了什么吗?

最佳答案

我会放 在这里回答。向您展示什么(某种)有效,但看起来非常恕我直言。

通过使用一种方法扩展 DataFrame 来强制 StructType在已经存在的 StructType 之上的案例类它确实有效,但也许(我真的希望)有更好/更清洁的解决方案。

开始:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._

case class DataFrameExtended(dataFrame: DataFrame) {

def forceMergeSchema[T: TypeTag]: DataFrame = {
ScalaReflection
.schemaFor[T]
.dataType
.asInstanceOf[StructType]
.filterNot(
field => dataFrame.columns.contains(field.name)
)
.foldLeft(dataFrame){
case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType))
}
}
}

implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
DataFrameExtended(df)
}

val ds2 = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customB.json")
.forceMergeSchema[CustomData]
.as[CustomData]
.show()

现在显示我希望的结果:
+----+---+----+
|colA| id|colB|
+----+---+----+
| x|321|null|
| y|654|null|
|null|987|null|
+----+---+----+

我只尝试过使用标量类型(如(Int、String 等))我认为更复杂的结构会失败得可怕。所以我仍在寻找更好的答案。

关于json - Spark 默认空列数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44886211/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com