gpt4 book ai didi

java - Spark-Kafka-Streaming : Offset Management - Can't get manual commit to work (Java)

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:48:29 24 4
gpt4 key购买 nike

我们正在使用 JavaInputDStream > 从 Apache Kafka 读取消息(值:JSON-String),加入一些 OracleDB 数据并写入 ElasticSearch。

我们按照 Spark Streaming - Kafka Integration Guide 中的描述实现了抵消管理但现在我们才意识到偏移量管理对我们不起作用,如果当前小批量失败,Stream 不会再次读取消息。即使我们跳过这一行,它也不会再次读取消息:

((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

我们将代码分解为以下内容,并期望流最终进入一个循环,一次又一次地读取相同的消息,但事实并非如此:

stream.foreachRDD(recordRDD -> {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) recordRDD.rdd()).offsetRanges();
if (!recordRDD.isEmpty()) {
LOGGER.info("Processing some Data: " + recordRDD.rdd().count());
}
});

Consumer config-param enable.auto.commit 设置为false,这在初始化JavaInputDStream 后也显示在Log 中。我们在测试中的嵌入式 Kafka Broker 和开发阶段的 Kafka-Server 面临着同样的问题。两者目前都以独立模式运行。

我们尝试的是:

  • 代理配置:增加 offsets.commit.timeout.ms
  • 消费者/流配置:将 isolation.level 设置为“read_committed”
  • 消费者/流配置:将 auto.offset.reset 设置为最早
  • Spark:将 spark.streaming.unpersist 设置为 false
  • Spark:增加 spark.streaming.kafka.maxRetries 的值
  • Stream:将 streamingPhaseDuration 调整为比 Mini-Batch 需要的时间更长
  • 流:启用检查点
  • 流:更改了 LocationStrategies

这些都不起作用,而且我们似乎搜索了整个网络却没有找到问题的原因。 Stream 似乎忽略了 enable.auto.commit 配置,只是在读取当前 RDD 的消息后提交。无论我们尝试什么,我们的流只会将每条消息只读一次。

我是否遗漏了任何不同的方法或事实?

最佳答案

经过更多测试后,我们发现手动提交只有在流在实际批处理期间停止/崩溃时才有效。如果流停止并重新开始,它会再次使用失败的数据。

因此,我们目前正在做的是在检测到故障时直接停止流 javaStreamingContext.stop(false)。在此之后,调度程序会再次启动流,它会验证流在规定的时间段内是否处于 Activity 状态,如果不是则启动它。

这不是一个优雅的解决方案,但它首先适合我们。

关于java - Spark-Kafka-Streaming : Offset Management - Can't get manual commit to work (Java),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47885410/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com