gpt4 book ai didi

scala - Spark/Scala、数据集和案例类的多态性

转载 作者:行者123 更新时间:2023-12-04 03:05:47 25 4
gpt4 key购买 nike

我们将 Spark 2.x 与 Scala 一起用于具有 13 种不同 ETL 操作的系统。其中 7 个相对简单,每个都由一个域类驱动,主要区别在于这个类以及负载处理方式的一些细微差别。

加载类的简化版本如下,出于本示例的目的,假设加载了 7 种披萨配料,这里是意大利辣香肠:

object LoadPepperoni {
def apply(inputFile: Dataset[Row],
historicalData: Dataset[Pepperoni],
mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._

val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row =>
PepperoniRaw(
weight = row.getAs[String]("weight"),
cost = row.getAs[String]("cost")
)
}.toDS()

val validatedData: Dataset[PepperoniRaw] = ??? // validate the data

val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data

val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw =>
Pepperoni( value = ???, key1 = ???, key2 = ??? )
}.toDS()

val joinedData = dedupedData.joinWith(historicalData,
historicalData.col("key1") === dedupedData.col("key1") &&
historicalData.col("key2") === dedupedData.col("key2"),
"right_outer"
)

joinedData.map { case (hist, delta) =>
if( /* some condition */) {
hist.copy(value = /* some transformation */)
}
}.flatMap(list => list).toDS()
}
}

换句话说,该类对数据执行一系列操作,这些操作大部分相同并且始终以相同的顺序进行,但每个顶部可能会略有不同,从“原始”到“域”的映射也是如此合并函数。

要为 7 种浇头(即蘑菇、奶酪等)执行此操作,我宁愿不简单地复制/粘贴类并更改所有名称,因为结构和逻辑对所有负载都是通用的。相反,我宁愿定义一个具有通用类型的通用“加载”类,如下所示:

object Load {
def apply[R,D](inputFile: Dataset[Row],
historicalData: Dataset[D],
mergeFun: (D, R) => D): Dataset[D] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._

val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row =>
...

并且对于每个特定于类的操作,例如从“原始”到“域”的映射或合并,都有一个实现细节的特征或抽象类。这将是一个典型的依赖注入(inject)/多态模式。

但是我遇到了一些问题。从 Spark 2.x 开始,只为原生类型和案例类提供了编码器,无法将一个类一般地标识为案例类。因此,在使用泛型类型时,推断的 toDS() 和其他隐式功能不可用。

也如this related question of mine中所述,案例类 copy 方法在使用泛型时也不可用。

我研究了其他与 Scala 和 Haskell 通用的设计模式,例如类型类或临时多态性,但障碍是 Spark 数据集基本上只能处理无法抽象定义的案例类。

这似乎是 Spark 系统中的常见问题,但我找不到解决方案。任何帮助表示赞赏。

最佳答案

启用 .toDS 的隐式转换是:

implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T]

(来自 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits)

您完全正确,因为 Encoder[T] 的范围内没有隐式值现在您已经使您的 apply 方法通用,所以这种转换不会发生。但是您可以简单地接受一个作为隐式参数!

object Load {
def apply[R,D](inputFile: Dataset[Row],
historicalData: Dataset[D],
mergeFun: (D, R) => D)(implicit enc: Encoder[D]): Dataset[D] = {
...

然后在您调用具有特定类型的负载时,它应该能够找到该类型的编码器。请注意,您必须 import sparkSession.implicits._在调用上下文中也是如此。

编辑:类似的方法是启用隐式 newProductEncoder[T <: Product](implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): Encoder[T]通过绑定(bind)您的类型( apply[R, D <: Product] )并接受隐式 JavaUniverse.TypeTag[D] 来工作作为参数。

关于scala - Spark/Scala、数据集和案例类的多态性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44708215/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com