gpt4 book ai didi

java - 在 Kafka Stream 中处理消息时发生错误时重新处理消息

转载 作者:行者123 更新时间:2023-12-02 09:50:37 46 4
gpt4 key购买 nike

我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行 map 转换并打印该消息。 KStream 配置如下

@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
PrintAction printAction, String topicName) {
KStream<String, JsonNode> source = builder.stream(topicName,
Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
// @formatter:off
source
.map(myTransformer)
.foreach(printAction);
// @formatter:on
return source;
}

MyTransformer内,我正在调用外部微服务,该服务此时可能会关闭。如果调用失败(通常抛出 RuntimeException),我将无法进行转换。

问题是,如果之前的处理过程中发生任何错误,有什么方法可以再次重新处理 Streams 应用程序中的消息?

根据我目前的研究,没有办法这样做,我唯一的可能性是将消息推送到死信主题,并尝试在将来处理它,如果它再次失败,我再次将其推送到 DLT 并执行以这种方式重试。

最佳答案

如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流会将状态更改为 ERROR 并停止使用发生错误的分区的传入消息。你需要自己捕获异常。重试可以通过以下两种方式实现:1) 使用 Spring RetryTemplate 调用外部微服务(但请记住,从特定分区消费消息会出现延迟),或者 2) 将失败的消息推送到另一个主题供以后重新处理(按照您的建议)

<小时/>

kafka-streams 2.8.0

以来更新

kafka-streams 2.8.0 起,您可以自动替换失败的流线程(由未捕获的异常引起的)使用 KafkaStreams 方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD。更多详情请查看Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

关于java - 在 Kafka Stream 中处理消息时发生错误时重新处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56342075/

46 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com