gpt4 book ai didi

java - 反序列化失败时的 Kafka 消费者行为

转载 作者:行者123 更新时间:2023-12-02 10:32:54 33 4
gpt4 key购买 nike

我想知道当对象反序列化失败时如何告诉kafka消费者要从 kafka 中删除记录,请记录错误并继续监听其他传入消息。

我的消费者配置如下:消费者属性:

Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NullDeserializer.class.getName());

consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
kafkaTelecontrolEventConsumerProperties.getClientIdConfig());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaTelecontrolEventConsumerProperties.getGroupIdConfig());

return new DefaultKafkaConsumerFactory<>(consumerProperties);

NullDeserializer.class 是我用于测试的反序列化器:

@Log4j2
public class NullDeserializer implements Deserializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public Object deserialize(String topic, byte[] data) {
try {
TeleControl.Event.parseFrom(data) ;
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
return null;
}

@Override
public void close() {

}
}

集成流程:

return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(telecontrolEventConsumerFactory,
eventConsumerProperties.getTelecontrolEventTopic())
)
.handle(System.err::println)
.get();

我的主要问题是当解析失败时抛出新的RuntimeException时,如何告诉kafkaConsumer记录错误,删除记录并继续处理下一条kafka消息。

最佳答案

请参阅ErrorHandlingDeserializer 2.2 中引入。

即将发布的 2.2.1 版本(明天发布)将其替换为类型安全的 ErrorHandlingDeserializer2

在这两种情况下,反序列化异常都会传递到监听器容器,而监听器容器又将其直接传递到 ErrorHandler,而不是调用监听器。

关于java - 反序列化失败时的 Kafka 消费者行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53502486/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com