gpt4 book ai didi

java - 如何在 Spring Kafka Consumer 中跳过损坏的(不可序列化的)消息?

转载 作者:行者123 更新时间:2023-12-02 08:30:14 25 4
gpt4 key购买 nike

这个问题是针对Spring Kafka的,与Apache Kafka with High Level Consumer: Skip corrupted messages相关。

有没有办法配置 Spring Kafka 消费者跳过无法读取/处理(已损坏)的记录?

我看到一种情况,如果无法反序列化,消费者就会陷入同一条记录上。这是消费者抛出的错误。

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

消费者轮询主题,并在循环中不断打印相同的错误,直到程序被终止。

在具有以下消费者工厂配置的@KafkaListener中,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

最佳答案

您需要ErrorHandlingDeserializer:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer

如果您无法迁移到该 2.2 版本,请考虑实现您自己的版本,并为那些无法正确反序列化的记录返回 null

源代码在这里:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java

关于java - 如何在 Spring Kafka Consumer 中跳过损坏的(不可序列化的)消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53327998/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com