gpt4 book ai didi

java - 使用 Spring Boot 从 Kafka 队列消费时获取序列化异常

转载 作者:行者123 更新时间:2023-11-30 12:05:25 25 4
gpt4 key购买 nike

我正在使用 Spring Boot 和 Kafka 来消费来自 Kafka 队列的消息。

但是,如果Jackson对payload的解析有任何错误。

消息一直卡住,一直在重新尝试消费,一直报解析异常。

我尝试在 Kafka 配置中使用 ErrorHandlingDeserializer2 并映射错误处理程序,但问题仍然存在。

@Configuration
@EnableKafka
public class KafkaConfig
{

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

props.put(ConsumerConfig.GROUP_ID_CONFIG, "${connecto.group-id}");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

return props;
}

@Bean
public ConsumerFactory<String, UserDto> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(UserDto.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

factory.setErrorHandler(new MosaicKafkaErrorHandler());

return factory;
}

@Bean
public KafkaTemplate kafkaTemplate()
{
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, UserSearchDto> producerFactory()
{
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
}

public class MosaicKafkaErrorHandler implements ContainerAwareErrorHandler{

@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
thrownException.printStackTrace();

}

}

我希望以一种方式实现它,无论何时发生配对异常,它都应该只记录该消息并继续处理下一条记录,而不是陷入困境。

错误如下-

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.gxxx.mxxx.common.dto.UserDetailsDto` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{"field1":20000,"field2":20000,"type":""}')

at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:741)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kafakaqueue.identify-1 at offset 326355379. If needed, please seek past the record to continue consumption.

最佳答案

为了解决这个问题,version 2.2引入了 ErrorHandlingDeserializer2。此反序列化器委托(delegate)给真正的反序列化器(键或值)。如果委托(delegate)无法反序列化记录内容,ErrorHandlingDeserializer2 会在包含原因和原始字节的 header 中返回空值和 DeserializationException。当您使用记录级 MessageListener 时,如果 ConsumerRecord 包含键或值的 DeserializationException header ,则会使用失败的 ConsumerRecord 调用容器的 ErrorHandler。记录不会传递给监听器。

但是,由于您使用的是早期版本,您可以使用下面的 hack。

     @Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
thrownException.printStackTrace();
if (thrownException instanceOf SerializationException){
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, offset + 1);
}
}

关于java - 使用 Spring Boot 从 Kafka 队列消费时获取序列化异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56310258/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com