gpt4 book ai didi

scala - CoGroupedRDD 是做什么的?

转载 作者:行者123 更新时间:2023-12-01 10:00:05 25 4
gpt4 key购买 nike

谁能用简单的术语解释 CoGroup RDD 的作用?下面的代码在两个 RDD 之间进行连接

val schema = "some_schema"
val RDD = {sc.cassandraTable[(String, String, Int, Int, Int, Int)](schema, "Event_table").select("column1" as "_1", "column2" as "_2", "column3" as "_3", "column4" as "_4", "column5" as "_5","column6" as "_6").keyBy[Tuple2[Int,Int]]("column5","column6")}
val RDD2 = {sc.cassandraTable[(Int,Int)](ks, "crew_table").select ("crewid_1" as "_1", "crewid_2" as "_2", "crewid_desc").keyBy[Tuple2[Int, Int]]("crewid_1", "crewid_2")}
val joinedRDD = RDD.leftOuterJoin(RDD2)
joinedRDD.take(10).foreach(println)
val RDD3 = {sc.cassandraTable[(Int,String)](ks, "Crew").select ("crewid_1" as "_1", "crewid_2" as "_2").keyBy[Tuple1[Int]]("crew_id")}
val mjoin = joinedRDD.map { x => (x._1._1, x._2) }


val result = mjoin.join(RDD3)
result.toDebugString
    res19: String = 
(6) MapPartitionsRDD[27] at leftOuterJoin at <console>:66 []
| MapPartitionsRDD[26] at leftOuterJoin at <console>:66 []
| CoGroupedRDD[25] at leftOuterJoin at <console>:66 []
+-(6) MapPartitionsRDD[21] at map at <console>:60 []
| | MapPartitionsRDD[17] at leftOuterJoin at <console>:58 []
| | MapPartitionsRDD[16] at leftOuterJoin at <console>:58 []
| | CoGroupedRDD[15] at leftOuterJoin at <console>:58 []
| +-(6) CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:15 []
| +-(6) CassandraTableScanRDD[5] at RDD at CassandraRDD.scala:15 []
+-(6) CassandraTableScanRDD[11] at RDD at CassandraRDD.scala:15 []

最佳答案

cogroup 的最简单形式具有以下签名:

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] 

其中“self”是 RDD[(K, V)]。简而言之,它需要键值对的 RDD 并按键对值进行分组,保持每个源的值在逻辑上分开:

val rdd1 = sc.parallelize(Seq((1, "foo"), (1, "bar"), (2, "foobar")))
val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (3, 3)))
rdd1.cogroup(rdd2).collect.foreach(println)
(1,(CompactBuffer(foo, bar),CompactBuffer(1, 2)))
(2,(CompactBuffer(foobar),CompactBuffer()))
(3,(CompactBuffer(),CompactBuffer(3)))

此机制用于实现连接。一旦数据被共同分组,您就可以将其展平

for { lv <- lss; rv <- rvs } yield (key, (lv, rv))

完成内部连接。外连接遵循相同的过程,对空序列进行小幅调整。

关于scala - CoGroupedRDD 是做什么的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42522664/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com