gpt4 book ai didi

java - @KafkaListener 不使用消息 - 反序列化问题

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:21:26 24 4
gpt4 key购买 nike

使用 Spring 云流的 Kafka 绑定(bind)的消息生产者

@Component
public static class PageViewEventSource implements ApplicationRunner {

private final MessageChannel pageViewsOut;
private final Log log = LogFactory.getLog(getClass());

public PageViewEventSource(AnalyticsBinding binding) {
this.pageViewsOut = binding.pageViewsOut();
}

@Override
public void run(ApplicationArguments args) throws Exception {
List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = pages.get(new Random().nextInt(names.size()));
PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);


Serializer<PageViewEvent> serializer = new JsonSerde<>(PageViewEvent.class).serializer();
byte[] m = serializer.serialize(null, pageViewEvent);

Message<byte[]> message = MessageBuilder
.withPayload(m).build();

try {
this.pageViewsOut.send(message);
log.info("sent " + message);
} catch (Exception e) {
log.error(e);
}
};
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
}

这个在序列化下面使用

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$BytesSerde

我正在尝试通过 Spring Kafka - KafkaListener 在单独的消费者应用程序中使用这些消息

@Service
public class PriceEventConsumer {


private static final Logger LOG = LoggerFactory.getLogger(PriceEventConsumer.class);

@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(Bytes data){
//public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
LOG.info("Message received");
LOG.info("received data='{}'", data);

}

容器工厂配置

 @Bean
public ConsumerFactory<String, Bytes> consumerFactory() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return new DefaultKafkaConsumerFactory<>(props);

}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes>

kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

使用此配置,消费者不会接收消息(字节)。如果我将 Kafka 监听器更改为接受 String 那么它会给我以下异常:

   @KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")

public void receive(String data){

LOG.info("Message received");
LOG.info("received data='{}'", data);

}

原因:

org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.kafka.common.utils.Bytes] to [java.lang.String] for GenericMessage [payload={"userId":"facebook","page":"about","duration":10}, headers={kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}], failedMessage=GenericMessage [payload={"userId":"facebook","page":"about","duration":10}, headers={kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}] ... 23 more Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.kafka.common.utils.Bytes] to [java.lang.String] for GenericMessage [payload={"userId":"facebook","page":"about","duration":10}, headers={kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE] ... 22 more

任何指示都会非常有帮助。

更新 POJO 部分

Pojo部分——

  @KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")

public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
LOG.info("Message received");
LOG.info("received data='{}'", data);

}

容器工厂配置

 @Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));



}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(priceEventConsumerFactory());
return factory;
}

制作人-

  @Override
public void run(ApplicationArguments args) throws Exception {
List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = pages.get(new Random().nextInt(names.size()));
PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);

Message<PageViewEvent> message = MessageBuilder
.withPayload(pageViewEvent).build();
try {
this.pageViewsOut.send(message);
log.info("sent " + message);
} catch (Exception e) {
log.error(e);
}
};

最佳答案

您可以将记录从 kfka 反序列化为 POJO,对于版本 <2.2.x,请使用 MessageConverter

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean

@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class,false));

}

或者使用MessageConverter

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}

关于java - @KafkaListener 不使用消息 - 反序列化问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55244407/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com