gpt4 book ai didi

error-handling - 我的kafka错误处理程序没有被调用,我应该显式调用吗?

转载 作者:行者123 更新时间:2023-12-03 08:45:16 26 4
gpt4 key购买 nike

我正在开发一个kafka使用者API,该API会使用来自某个主题的消息。当它使用不正确的消息(例如格式不正确的JSON消息)时,我希望应该调用我的错误处理程序以通知支持小组对不正确的消息采取一些措施。

但是我的错误处理程序不会自动调用。您能告诉我我的代码中缺少什么吗?

如果我将错误处理程序自动连接到侦听器类并显式调用,则一切正常。

错误处理程序类

public class MyErrorHandler implements ErrorHandler, KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException ex, Consumer<?, ?> consumer) {
.....
}
}

消费者配置
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerGroupFactory());
//factory.getContainerProperties().setPollTimeout(3000);
//factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.getContainerProperties().setAckOnError(false);
factory.setErrorHandler(new MyErrorHandler());
return factory;
}

@Bean
public MyErrorHandler myErrorHandler() {
return new MyErrorHandler();
}

侦听器类
@KafkaListener(topics = "${kafka.proposal.topic.name}" ,                containerFactory = "kafkaManualAckListenerContainerFactory",                errorHandler ="${kafka.custom.error.handler}")
public void listen(ConsumerRecord<?,?> cr) {
//Logic to get the message from topic and parse it to json, here i am testing with incorrect messages and producing JsonSyntaxException
}

注意:我的属性文件中的kafka.custom.error.handler = myErrorHandler。

我希望我的错误处理程序会自动被调用。但事实并非如此。我是否缺少任何配置。

最佳答案

如果发生反序列化错误,例如在JsonDeserializer中,它发生在Spring获取消息之前,因此我们无法调用错误处理程序。

从2.2版开始,您可以使用ErrorHandlingDeserializer2包装解串器。这允许框架获取失败的反序列化并将其传递给容器的错误处理程序。

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer2. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

关于error-handling - 我的kafka错误处理程序没有被调用,我应该显式调用吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56222542/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com