gpt4 book ai didi

scala - 如何在 apache spark 中将 RDD[ParentClass] 与 RDD[Subclass] 匹配?

转载 作者:行者123 更新时间:2023-12-04 15:29:51 25 4
gpt4 key购买 nike

我必须将一个 rdd 与其类型相匹配。

trait Fruit

case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends Fruit

现在 DStream[Fruit] 类型的数据流来了。它是 AppleMango

如何基于子类进行操作?类似下面的内容(不起作用):

dStream.foreachRDD{rdd:RDD[Fruit] =>
rdd match {
case rdd: RDD[Apple] =>
//do something

case rdd: RDD[Mango] =>
//do something

case _ =>
println(rdd.count() + "<<<< not matched anything")
}
}

最佳答案

因为我们有一个 RDD[Fruit] , 任何行都可以是 AppleMango .使用 foreachRDD 时, 每个 RDD将包含这些(以及可能的其他)类型的混合。

为了区分不同的类型,我们可以使用 collect[U](f: PartialFunction[T, U]): RDD[U] (这不要与返回包含 RDD 元素的列表的 collect(): Array[T] 混淆)。此函数将通过应用函数 f 返回包含所有匹配值的 RDD。 (在这种情况下,我们可以在这里使用模式匹配)。

下面是一个小的说明性示例(也将 Orange 添加到水果中)。

设置:

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[Fruit]] = Queue()
val dStream: InputDStream[Fruit] = ssc.queueStream(inputData)

inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))

这会创建一个 RDD[Fruit] 的流有两个独立的RDD

dStream.foreachRDD{rdd: RDD[Fruit] =>
val mix = rdd.collect{
case row: Apple => ("APPLE", row.price) // do any computation on apple rows
case row: Mango => ("MANGO", row.price) // do any computation on mango rows
//case _@row => do something with other rows (will be removed by default).
}
mix foreach println
}

在上面collect ,我们稍微更改每一行(删除类),然后打印结果 RDD .结果:

// First RDD
(MANGO,11)
(APPLE,5)
(APPLE,5)

// Second RDD
(MANGO,10)

可以看出,模式匹配保留并更改了包含 Apple 的行和 Mango同时删除所有 Orange类。


独立的 RDD

如果需要,也可以将两个子类分成各自的子类 RDD如下。然后可以对这两个 RDD 执行任何计算

val apple = rdd.collect{case row: Apple => row}
val mango = rdd.collect{case row: Mango => row}

完整示例代码

trait Fruit
case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends Fruit
case class Orange(price:Int) extends Fruit

object Test {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local[*]").getOrCreate()

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[Fruit]] = Queue()
val inputStream: InputDStream[Fruit] = ssc.queueStream(inputData)

inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))

inputStream.foreachRDD{rdd:RDD[Fruit] =>
val mix = rdd.collect{
case row: Apple => ("APPLE", row.price) // do any computation on apple rows
case row: Mango => ("MANGO", row.price) // do any computation on mango rows
//case _@row => do something with other rows (will be removed by default).
}
mix foreach println
}

ssc.start()
ssc.awaitTermination()
}
}

关于scala - 如何在 apache spark 中将 RDD[ParentClass] 与 RDD[Subclass] 匹配?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61444973/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com