gpt4 book ai didi

Spring Kafka JsonDesirialization MessageConversionException 无法解析类名 未找到类

转载 作者:行者123 更新时间:2023-12-02 22:51:13 34 4
gpt4 key购买 nike

我有两个服务应该通过 Kafka 进行通信。我们将第一个服务命​​名为 WriteService,将第二个服务命名为 QueryService

WriteService方面,我为生产者进行了以下配置。

@Configuration
public class KafkaProducerConfiguration {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);

return props;
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

我正在尝试发送 com.example.project.web.routes.dto.RouteDto 类的对象

QueryService端,消费者配置定义如下。

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.groupid}")
private String serviceGroupId;

@Value("${spring.kafka.consumer.trusted-packages}")
private String trustedPackage;

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

return factory;
}

}

监听器具有以下定义。有效负载类具有完全限定名称 - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto

@KafkaListener(topics = "${spring.kafka.topics.routes}",
containerFactory = "kafkaListenerContainerFactory")
public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
@Payload RouteDto payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
}

private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}

发送消息时,出现以下错误

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.project.web.routes.dto.RouteDto]; nested exception is java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto

这个错误已经很明显了。但是我不明白为什么它默认有这种行为。我不希望在不同的服务中拥有相同的包,这根本没有意义。

我还没有找到一种方法来禁用此功能并使用提供给监听器的类,用 @Payload 注释

如何在不手动配置映射器的情况下解决这个问题?

最佳答案

如果您使用的是 spring-kafka-2.2.x,您可以通过 JsonDeserializer 的重载构造函数禁用默认 header docs

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default). The following example shows how to do so:

DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

如果版本较低,请使用MessageConverter(您可能会在 spring-kafka-2.1.x 及更高版本中看到此问题)

Spring for Apache Kafka provides a MessageConverter abstraction with the MessagingMessageConverter implementation and its StringJsonMessageConverter and BytesJsonMessageConverter customization. You can inject the MessageConverter into a KafkaTemplate instance directly and by using AbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property. The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}

@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(RouteDto dto) {
...
}

注意:只有在方法级别声明@KafkaListener注解才能实现这种类型推断。对于类级别的 @KafkaListener,有效负载类型用于选择要调用哪个 @KafkaHandler 方法,因此在选择方法之前它必须已经被转换。

关于Spring Kafka JsonDesirialization MessageConversionException 无法解析类名 未找到类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54690518/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com