gpt4 book ai didi

scala - 如何通过.map在另一个RDD中传递一个RDD

转载 作者:行者123 更新时间:2023-12-01 02:02:45 24 4
gpt4 key购买 nike

我有两个 rdd,我想为 rdd1 的每个项目对 RDD2 项目进行一些计算。所以,我在用户定义的函数中传递 RDD2,如下所示,但我收到类似 rdd1 cannot be passed in another rdd 的错误.如果我想在两个 rdd 上执行操作,我能知道如何实现吗?

例如:
RDD1.map(line =>function(line,RDD2))

最佳答案

正如错误所说,Spark 不支持嵌套 RDD。通常你必须通过重新设计算法来解决它。

如何做取决于实际用例,在 function 中究竟发生了什么|它的输出是什么。

有时是 RDD1.cartesian(RDD2) ,对每个元组进行操作,然后按键减少将起作用。有时,如果您有 (K,V)键入两个 RDD 之间的连接将起作用。

如果 RDD2 很小,您可以随时在驱动程序中收集它,将其设置为广播变量并在 function 中使用该变量。而不是 RDD2 .

@编辑:

例如,让我们假设您的 RDD 包含字符串和 function将计算来自 RDD 的给定记录的次数发生在 RDD2 :

def function(line: String, rdd: RDD[String]): (String, Int) = {
(line, rdd.filter(_ == line).count)
}

这将返回 RDD[(String, Int)] .

Idea1

您可以尝试使用 cartesian product使用 RDD cartesian方法。
val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String, String)]
.map( (r1,r2) => (r1, function2) ) // creates RDD[(String, Int)]
.reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String, Int)]

这里 function2需要 r1r2 (它们是字符串)并返回 1如果它们相等并且 0如果不。最终 map 将导致 RDD它将有元组,其中键是来自 r1 的记录和值(value)将是总计数。

问题 1:如果您在 RDD1 中有重复的字符串,这将不起作用, 尽管。你得考虑一下。如 RDD1记录有一些独特的 ID,这将是完美的。

问题 2:这确实创建了很多对(对于两个 RDD 中的 100 亿条记录,它将创建大约 5000 亿对),会很慢并且很可能会导致很多 shuffling .

想法2

我不明白你对 RDD2 大小的评论 lacs所以这可能有效,也可能无效:
val rdd2array = sc.broadcast(RDD2.collect())
val result = RDD1.map(line => function(line, rdd2array))

问题:这可能会破坏你的内存。 collect()driver 上被调用和 all来自 rdd2 的记录将加载到驱动程序节点上的内存中。

Idea3

根据用例,还有其他方法可以解决这个问题,例如 brute force algorithm for Similarity Search与您的用例相似(不是双关语)。对此的替代解决方案之一是 Locality Sensitive Hashing .

关于scala - 如何通过.map在另一个RDD中传递一个RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34823732/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com