gpt4 book ai didi

apache-spark - Spark 累加器,我总是得到 0 值

转载 作者:行者123 更新时间:2023-12-04 04:21:04 25 4
gpt4 key购买 nike

我正在使用 LongAccumulator 来计算我在 Cassandra 中保存的记录数。

object Main extends App {
val conf = args(0)
val ssc = StreamingContext.getStreamingContext(conf)
Runner.apply(conf).startJob(ssc)

StreamingContext.startStreamingContext(ssc)
StreamingContext.stopStreamingContext(ssc)
}
class Runner (conf: Conf) {
override def startJob(ssc: StreamingContext): Unit = {
accTotal = ssc.sparkContext.longAccumulator("total")
val inputKafka = createDirectStream(ssc, kafkaParams, topicsSet)
val rddAvro = inputKafka.map{x => x.value()}
saveToCassandra(rddAvro)
println("XXX:" + accTotal.value) //-->0
}


def saveToCassandra(upserts: DStream[Data]) = {
val rddCassandraUpsert = upserts.map {
record =>
accTotal.add(1)
println("ACC: " + accTotal.value) --> 1,2,3,4.. OK. Spark Web UI, ok too.
DataExt(record.data,
record.data1)}
rddCassandraUpsert.saveToCassandra(keyspace, table)
}
}

我看到代码执行正确,我将数据保存在 Cassandra 中,当我最终打印累加器时,该值是 0,但如果我在 map 函数中打印它,我可以看到正确的值。为什么?

我正在使用 Spark 2.0.2 并在本地模式下从 Intellj 执行。我检查了 spark web UI,我可以看到 accumulador 已更新。

最佳答案

问题大概出在这里:

object Main extends App {
...

Spark doesn't support applications extending App ,这样做会导致不确定的行为:

Note that applications should define a main() method instead of extending scala.App. Subclasses of scala.App may not work correctly.

您应该始终将标准应用程序与 main 一起使用:

object Main {
def main(args: Array[String]) {
...

关于apache-spark - Spark 累加器,我总是得到 0 值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50269728/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com