gpt4 book ai didi

java - Spark结构化流: Current batch is falling behind

转载 作者:行者123 更新时间:2023-11-30 05:53:05 25 4
gpt4 key购买 nike

这似乎是非常简单的实现,但看起来存在一些问题。

此作业从 kafka 主题读取偏移量(ui 事件数据),进行一些聚合并将其写入 Aerospike 数据库。

在高流量的情况下,我开始看到这个问题:作业运行良好,但没有插入新数据。查看日志,我看到以下警告消息:

Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 43491 milliseconds

有几次作业恢复写入数据,但我可以看到计数很低,这表明存在一些数据丢失。

这是代码:

Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", newTopic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
.option("failOnDataLoss", false)
.load();
StreamingQuery query = stream
.writeStream()
.option("startingOffsets", "earliest")
.outputMode(OutputMode.Append())
.foreach(sink)
.trigger(Trigger.ProcessingTime(triggerInterval))
.queryName(queryName)
.start();

最佳答案

您可能需要处理maxOffsetsPerTrigger来调整每批的总输入记录。否则,您的应用程序的滞后可能会在批处理中带来更多记录,从而减慢下一批的速度,进而在后续批处理中带来更多滞后。

有关 Kafka 配置的更多详细信息,请参阅以下链接。

https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html

关于java - Spark结构化流: Current batch is falling behind,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53599569/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com