gpt4 book ai didi

spring-boot - 无法从 [java.lang.String] 转换为 [com.example.demo.User]

转载 作者:行者123 更新时间:2023-12-05 06:51:05 26 4
gpt4 key购买 nike

我正在研究 Spring Boot 和 Apache Kafka - 尝试让用户定义配置 -

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.demo.Consumer.consume(com.example.demo.User) throws java.io.IOException]
Bean [com.example.demo.Consumer@7cd4a8cc]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}], failedMessage=GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}], failedMessage=GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2110) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2098) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1997) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1924) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1812) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.5.jar:2.6.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}], failedMessage=GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2065) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2047) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1984) ~[spring-kafka-2.6.5.jar:2.6.5]
... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.3.jar:5.3.3]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:926) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.3.3.jar:5.3.3]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.3.3.jar:5.3.3]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.3.3.jar:5.3.3]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.6.5.jar:2.6.5]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329) ~[spring-kafka-2.6.5.jar:2.6.5]
... 13 common frames omitted

我们如何解决这个问题?

下面是我的代码-

用户.java

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private String name;
private int age;
}

KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

// 1. Send string to Kafka
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

// 2. Send User objects to Kafka
@Bean
public ProducerFactory<String, User> userProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
}

生产者.java

@Service
public class Producer {

private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "users";

@Autowired
private KafkaTemplate<String, User> kafkaTemplate;

public void sendMessage(User user) {
logger.info(String.format("#### -> Producing message -> %s", user.toString()));
this.kafkaTemplate.send(TOPIC, user);
}
}

KafkaConsumerConfig.java

@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Value(value = "${general.topic.group.id}")
private String groupId;

@Value(value = "${user.topic.group.id}")
private String userGroupId;

// 1. Consume string data from Kafka
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props);
}

// 2. Consume user objects from Kafka
public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, userGroupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class));
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
}

消费者.java

@Service
public class Consumer {

private final Logger logger = LoggerFactory.getLogger(Producer.class);

@KafkaListener(topics = "users", groupId = "group_id")
public void consume(User user) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", user.toString()));
}
}

KafkaController.java

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

private final Producer producer;

@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}

@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestBody User user) {
this.producer.sendMessage(user);
}
}

KafkaExampleApplication.java

@SpringBootApplication
public class KafkaExampleApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaExampleApplication.class, args);
}
}

应用程序属性

server.port=9000

kafka.bootstrapAddress=localhost:9092
general.topic.group.id=group_id
user.topic.group.id=MyGrpId

最佳答案

在您的 Producer 类中,您需要将 kafkaTemplate 重命名为 userKafkaTemplate,它通过您要使用的 JSON 序列化器连接到用户生产者工厂。

事实上,如果您不使用其他模板和生产者工厂,您可能应该删除它们

关于spring-boot - 无法从 [java.lang.String] 转换为 [com.example.demo.User],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66271759/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com