gpt4 book ai didi

scala - 在 Spark/Scala 中使用 ForEach 时的执行流程

转载 作者:行者123 更新时间:2023-12-02 19:14:07 25 4
gpt4 key购买 nike

我对集群上的执行流程有一个奇怪的问题。

方法 A 调用
- 在 FOREACH 中调用的方法 B
- 方法 C

执行流程应该是

Method A --> Method B --> Method C 

但它是这样工作的:
1) Method A --> Method B (skips Method C) and continues to rest of Method B.
2) Method C is executed later separately.

因为流程不是正确的流程, accum1.value在方法 B 中显示为 blank/null .
**CLASS A::METHOD A:**

object TakeDFs {

def takeDFs(df: DataFrame): Unit = {
println("---------------- takeNettedDFs::START ---------------- ")

for(i <- 0 until bySecurityArray.length) {
allocProcessDF = bySecurityArray(i).toDF()
....

//WORKS
AllocOneProcess.getAllocOneDFs(allocProcessDF)

}
println("---------------- takeNettedDFs::END ---------------- ")

}
}

**CLASS B::METHOD B:**

object AllocOneProcess {

def getAllocOneDFs(df: DataFrame): Unit = {
println("---------------- getAllocOneDFs::START ---------------- ")

df.coalesce(1).sort($"PRIORITY" asc).foreach( {
row => AllocOneTest.allocProcessTest(row)
})

println("------------- getAllocOneDFs::accum1.value -------------" + accum1.value)

println("---------------- getAllocOneDFs::END ---------------- ")

}
}

**CLASS C::METHOD C:**

object AllocOneTest {

def allocProcessTest(row: Row): Unit = {
println("---------------- AllocOneTest::allocProcessTest::START ---------------- ")

accum1.add(RegRptPilotConstants.PairProcessCaseClass(row(0).asInstanceOf[String], row(1).asInstanceOf[String], row(2).asInstanceOf[String]))


println("---------------- AllocationOneTest::allocProcessTest::END ---------------- ")

}
}

**CLASS D::**

object RegRptPilotConstants {
var pairedOneSeq = Seq[PairProcessCaseClass]()
val accum1 = new ProcessAccumulator[ProcessCaseClass]()

}

最佳答案

对于上面的代码,foreach 是触发 Spark DAG 执行整个流程的 Action 。 Action ,即上述场景中的 foreach 是在每个分区上同时并行执行/调用的,因此最终调用了 foreach 内部的方法。

流程:方法 A --> 方法 B --> 方法 C、方法 C、方法 C ...

df.coalesce(1).sort($"PRIORITY" asc).foreach( {
row => AllocOneTest.allocProcessTest(row)
})

引用: https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

关于scala - 在 Spark/Scala 中使用 ForEach 时的执行流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58237522/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com