gpt4 book ai didi

scala - 如何解决多个联合后的 Apache Spark StackOverflowError

转载 作者:行者123 更新时间:2023-12-01 22:56:48 26 4
gpt4 key购买 nike

我有一个 Spark Scala 程序,它使用 REST API 来批量获取数据,一旦检索到所有数据,我就对其进行操作。

当前计划:

  • 对于每个批处理,创建 RDD 并将其与之前的 RDD 合并使用之前的 API 调用 rdd.union(currentRdd) 创建。

  • 对最终 RDD 进行操作

重现问题的简单程序:

    def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
val sc = new SparkContext(conf)
val limit = 1000;
var rdd = sc.emptyRDD[Int]
for (x <- 1 to limit) {
val currentRdd = sc.parallelize(x to x + 3)
rdd = rdd.union(currentRdd)
}
println(rdd.sum())
}

问题:- 当批处理数量很高时,程序会抛出 StackOverflowError :Exception in thread "main" java.lang.StackOverflowError
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply

我假设,当批处理数量增加时,RDD 依赖关系图变得非常复杂并引发错误。

解决此问题的最佳方法是什么?

最佳答案

已经有 SparkContext.union它知道如何正确计算多个RDD:

val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = sc.union(rdds)

或者,您可以尝试使用 this辅助函数以避免创建长链的union:

val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = balancedReduce(rdds)(_ union _)

它应该起作用的原因本质上与链接的答案相同:O(n) union 链破坏了堆栈,O( log(n))-unions 的高二叉树则不然。

关于scala - 如何解决多个联合后的 Apache Spark StackOverflowError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57143368/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com