gpt4 book ai didi

scala - 如何根据基于Spark中另一个RDD的函数过滤RDD?

转载 作者:行者123 更新时间:2023-12-01 09:50:41 25 4
gpt4 key购买 nike

我是 Apache Spark 的初学者。我想过滤掉所有权重总和大于 RDD 中常数值的组。 “权重”映射也是一个 RDD。这里有一个小demo,要过滤的组存储在“groups”中,常量值为12:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)

当输入数据非常大时,例如 > 10GB,我总是会遇到“java heap out of memory”错误。我怀疑它是否是由“weights.toArray.toMap”引起的,因为它将分布式RDD转换为JVM中的Java对象。所以我尝试直接用RDD过滤:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)

当我将此脚本加载到 spark shell 后运行 result.collect 时,出现“java.lang.NullPointerException”错误。有人告诉我在另一个RDD中操作一个RDD时,会出现空指针异常,建议我把权重放到Redis中。

那么不把“权重”转换成Map,或者放到Redis中,怎么才能得到“结果”呢?如果没有外部数据存储服务的帮助,是否有基于另一个类似 map 的 RDD 过滤 RDD 的解决方案?谢谢!

最佳答案

假设您的组是独一无二的。否则,首先通过 distinct 等使其独一无二。如果组或权重很小,应该很容易。如果 group 和 weights 都很大,你可以试试这个,它可能更可扩展,但看起来也很复杂。

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
//map groups to be (a, (a,b,c,d)), (b, (a,b,c,d), (c, (a,b,c,d)....
val g1=groups.flatMap(s=>s.split(",").map(x=>(x, Seq(s))))
//j will be (a, ((a,b,c,d),3)...
val j = g1.join(weights)
//k will be ((a,b,c,d), 3), ((a,b,c,d),2) ...
val k = j.map(x=>(x._2._1, x._2._2))
//l will be ((a,b,c,d), (3,2,5,1))...
val l = k.groupByKey()
//filter by sum the 2nd
val m = l.filter(x=>{var sum = 0; x._2.foreach(a=> {sum=sum+a});sum > 12})
//we only need the original list
val result=m.map(x=>x._1)
//don't do this in real product, otherwise, all results go to driver.instead using saveAsTextFile, etc
scala> result.foreach(println)
List(e,g)
List(b,c,e)

关于scala - 如何根据基于Spark中另一个RDD的函数过滤RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26035582/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com