gpt4 book ai didi

scala - 加入多个rdd

转载 作者:行者123 更新时间:2023-12-01 03:36:26 25 4
gpt4 key购买 nike

我有 4rdds 类型的 RDD:((int,int,int),values) 并且我的 rdds 是

rdd1: ((a,b,c), value) 
rdd2:((a,d,e),valueA)
rdd3:((f,b,g),valueB)
rdd4:((h,i,c),valueC)

我如何加入 rdds,如 rdd1 在“a”上加入 rdd2 rdd1 在“b”上加入 rdd2,在“c”上加入 rdd3

所以输出是 finalRdd: ((a,b,c),valueA,valueB,valueC,value))在斯卡拉?

我尝试使用 collectAsMap 执行此操作,但效果不佳并引发异常

仅用于 rdd1 的代码加入 rdd2
val newrdd2=rdd2.map{case( (a,b,c),d)=>(a,d)}.collectAsMap
val joined=rdd1.map{case( (a,b,c),d)=>(newrdd2.get(a).get,b,c,d)}

例子
rdd1: ((1,2,3),animals)
rdd2:((1,anyInt,anyInt),cat)
rdd3:((anyInt,2,anyInt),cow )
rdd 4: ((anyInt,anyInt,3),parrot)

输出应该是 ((1,2,3),animals,cat,cow,parrot )

最佳答案

有一个方便的join RDD 上的方法,但你需要它由你特定的连接键作为键,这是 Spark 用于分区和改组的。

来自 the docs :

join(otherDataset, [numTasks]) : When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.



我无法编译我所在的位置,但手动它是这样的:
val rdd1KeyA = rdd1.map(x => (x._1._1, (x._1._2, x._1._3. x._2) // RDD(a, (b,c,value))
val rdd2KeyA = rdd2.map(x => (x._1._1, x._2) // RDD(a, valueA)
val joined1 = rdd1KeyA.join(rdd2KeyA) // RDD(a, ((b,c,value), valueA))

val rdd3KeyB = rdd3.map(x => (x._1._2, x._2) // RDD(b, valueB)
val joined1KeyB = joined1.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._1._3. x._2._2) // RDD(b, (a, c, value, valueA))
val joined2 = joined1KeyB.join(rdd3keyB) // RDD(b, ((a, c, value, valueA), valueB))

...等等

避免 collect*函数,因为它们不使用数据的分布式特性,并且在大负载时容易失败,它们将 RDD 上的所有数据混洗到主节点上的内存中集合,可能会炸毁一切。

关于scala - 加入多个rdd,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34356374/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com