gpt4 book ai didi

java - 卡夫卡流 : Processor sometimes processes same messages upon application restart

转载 作者:行者123 更新时间:2023-11-29 08:23:08 25 4
gpt4 key购买 nike

在我的 Java 应用程序中,我有一个 Kafka 处理器。

我的处理方法是这样的:

@Override
public void process(String key, String value) {
System.out.println("In the process method, the offset is: " + context.offset());
//Some more code
}

其中 context 是来自 init 方法的 ProcessorContext。

我启动应用程序并记录下来:

In the process method, the offset is: 1203
In the process method, the offset is: 1204

然后我再次启动应用程序,我得到了相同的消息。在几次应用程序重新启动后(或者一段时间后,我找不到模式),过程方法停止被调用,我不再在应用程序启动时收到这些消息。

知道为什么有时会多次处理这些消息吗?

我的流配置具有以下属性:

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someId");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
props.put(StreamsConfig.STATE_DIR_CONFIG, "somedir");

编辑

下面的代码片段展示了我如何创建 KafkaStreams:

public class KafkaStreamsProcessorBean implements SmartLifecycle {
@Override
public synchronized void start() {
final KStreamBuilder builder = new KStreamBuilder();

final KStream<String, String> debeziumStream = builder.stream("debezium.topic");
debeziumStream.process(() -> debeziumProcessor);

kafkaStreams = new KafkaStreams(builder, streamsConfig);
kafkaStreams.start();
}
}

这里的 streamsConfig 是具有我显示的属性的配置,debeziumProcessor 是第一个代码片段中的 Kafka 处理器。

最佳答案

默认情况下,Kafka Streams 处理保证是至少一次。这意味着可以重新处理消息。

在您的情况下,即使您将 StreamsConfig.PROCESSING_GUARANTEE_CONFIG 设置为 StreamsConfig.EXACTLY_ONCE,您也可以在重启后看到相同的日志(具有相同的偏移信息)。

处理保证是关于在一次事务中向主题写入偏移量结果。这并不意味着消息不能处理多次(使用相同的键和值多次调用 Processor::process(...))。

可能出现以下情况:

  • 消息已读。
  • Processor::process(...) 被调用。
  • 应用程序完成但未写入偏移量。
  • 重启后应用程序将读取相同的消息,并且将调用相同键和值的 Processor::process(...)

关于java - 卡夫卡流 : Processor sometimes processes same messages upon application restart,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55793667/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com