gpt4 book ai didi

scala - 将 Parquet 作为 Scala 中的案例类对象列表读取

转载 作者:行者123 更新时间:2023-12-05 01:34:49 29 4
gpt4 key购买 nike

假设你已经写了一些 case 类的集合到 parquet,然后想在另一个 spark 作业中读取它,回到同一个 case 类(也就是说,你已经写了一些 List[MyCaseClass] 并且想读回来)。

一般而言,假设 MyCaseClass其中有嵌套的 case 类。

目前,我只能使用此代码蓝图才能使其正常工作:

  /** applies the secret sauce for coercing to a case class that is implemented by spark's flatMap */
private def toCaseClass(spark : SparkSession, inputDF : DataFrame) : Dataset[MyCaseClass] = {
import spark.implicits._
inputDF.as[MyCaseClass].flatMap(record => {
Iterator[MyCaseClass](record)
})
}

似乎在 Spark 2.x 中, flatMap将导致进行转换/强制转换的实验性 Spark 代码(当使用调试器查看时,该代码在 Spark 代码库中被注释为实验性的)。显然,序列化在 Java/Scala 中通常是一个棘手的问题。有没有额外的、安全的方法?

在 spark 之外,我发现在 stackoverflow 上其他地方建议的独立代码解决方案不稳定且支持不佳。

我正在寻找干净的、声明性的、不需要手动编码如何转换每个字段的方法,这些方法依赖于支持良好的实体库,不依赖于以一种击败优雅的方式的超慢反射。可能是一种不可能的需求组合,但这只是一种以它的案例类为荣并且将 Spark 作为其主要成就之一的语言的预期。

也欢迎评论为什么不使用案例类!

最佳答案

正如 Luis Miguel 所评论的那样,大多数数据集 API 都被标记为实验性的,但已经稳定并在生产中使用了好几年。

Dataset.as[U] 的问题

简单地使用 .as[MyCaseClass] 是完全正确的与显式实例化 case 类有几个细微的区别:最重要的是 Dataset.as[U]不保证您的数据集仅包含由类型 U 定义的列,它可能会保留可能会在以后破坏计算的其他数据。

下面是一个例子:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class MyData(value: Int)

val df: DataFrame = spark.createDataset(Seq(1,1,2,3)).withColumn("hidden",rand)

val ds: Dataset[MyData] = df.as[MyData]

ds.distinct.count
res3: Long = 4

数据集 ds保留 hidden列值,即使它没有在 MyData 类型中定义并且可能会产生意外的结果:有人在查看数据集 ds作为 MyData的集合上面肯定会期望不同计数为 3 而不是 4。

如何安全地转换为 Dataset[MyData] ?

如果您明确只想将案例类列保留在数据集中,则有一个非常简单的解决方案(但性能欠佳):将其提取为 RDD 并将其重新转换为数据集 [U]。
val ds = df.as[MyData].rdd.toDS()

ds.distinct.count
res5: Long = 3

它基本上完全符合您的 flatMap以相同的成本做:Spark 需要从其内部行格式反序列化数据以创建案例类实例并将其重新序列化为内部行。
它会产生不必要的垃圾、增加内存压力并可能破坏 WholeStage 代码生成优化。

在我看来,更好的方法是在将 Dataset 转换为指定的 case 类时从源 DataFrame 中选择必要的列。这将防止 as[U] 的大多数不需要的副作用。但没有反序列化/序列化成本。

一个优雅的方法是利用 Scala 的能力来扩展现有类和实例的行为 implicit classes :
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql._

object SparkExtensions {
implicit class ExtendedDataFrame(df: DataFrame) {
def to[T <: Product: TypeTag]: Dataset[T] = {
import df.sparkSession.implicits._
import org.apache.spark.sql.functions.col
df.select(Encoders.product[T].schema.map(f => col(f.name)): _*).as[T]
}
}
}

有了上面的对象,我现在可以修改我的初始代码:
import SparkExtensions._

val ds: Dataset[MyData] = df.to[MyData]

ds.distinct.count
res11: Long = 3

ds.printSchema
root
|-- value: integer (nullable = false)

关于scala - 将 Parquet 作为 Scala 中的案例类对象列表读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57450104/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com