gpt4 book ai didi

json - Spring-Kafka无法将AVRO GenericData.Record转换为Acknowledgment

转载 作者:行者123 更新时间:2023-12-03 02:20:30 30 4
gpt4 key购买 nike

使用 Spring Boot,我尝试以批量接收模式设置我的 Kafka 消费者:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter()); // I know this one won't work
factory.setBatchListener(true);
return factory;
}

@Bean
public ConsumerFactory<GenericData.Record, GenericData.Record> consumerFactory() {
Map<String, Object> dataRiverProps = getDataRiverProps();
dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}

这就是实际消费者的样子:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = 'kafkaListenerContainerFactory')
public void consumeAvro(List<GenericData.Record> list, Acknowledgment ack) {
messageProcessor.addMessageBatchToExecutor(list);
while (messageProcessor.getTaskSize() > EXECUTOR_TASK_COUNT_THRESHOLD) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER_ERROR.error(ExceptionUtils.getStackTrace(e.getCause()));
}
}
}

我遇到的异常如下所示:

nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.apache.avro.generic.GenericData$Record] to type [org.springframework.kafka.support.Acknowledgment]
at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:46)
at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:174)
at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66)

Kafka 消息是 AVRO 消息,我想将它们作为 JSON 字符串检索。是否有一个现成的 GenericData.Record AVRO 转换器可以插入 ConcurrentKafkaListenerContainerFactory 中?谢谢!

最佳答案

只需将以下属性添加到您的 kafka 消费者配置

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

关于json - Spring-Kafka无法将AVRO GenericData.Record转换为Acknowledgment,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54584067/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com