gpt4 book ai didi

java - Spring Kafka 和 Kafka Streams

转载 作者:行者123 更新时间:2023-12-04 10:49:14 25 4
gpt4 key购买 nike

在 Spring Boot 应用程序中,我试图配置 Kafka Streams。使用普通的 Kafka 主题,一切正常,但我无法使用 Spring Kafka Streams。

这是我的配置:

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new StreamsConfig(props);
}

@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {

KStream<String, String> stream = kStreamBuilder.stream("post.sent");

stream.mapValues(post -> post.toString()).to("streamingTopic2");

stream.print();

return stream;
}

@Bean
public NewTopic kafkaTopicTest() {
return new NewTopic("streamingTopic2", 1, (short) 1);
}

@KafkaListener(topics = "streamingTopic2", containerFactory = "kafkaListenerContainerFactory")
public void testListener(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) {

String value = consumerRecord.value();

System.out.println("VALUE: " + value);

ack.acknowledge();
}

}

我想创建一个基于 post.sent 的流话题。应用简单的转换并将消息从此流发送到测试 streamingTopic2话题。

现在当我将消息发送到 post.sent我无法立即在“streamingTopic2”中获取它,但在我的应用程序重新启动后,它启动失败并出现以下错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition streamingTopic2-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 100, 34, 58, 34, 53, 98, 56, 49, 53, 99, 97, 51, 52, 102, 97, 101, 102, 48, 52, 55, 97, 52, 48, 48, 100, 52, 50, 97, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, 83, 69, 78, 84, 34, 44, 34, 101, 120, 116, 101, 114, 110, 97, 108, 80, 111, 115, 116, 73, 100, 34, 58, 34, 48, 53, 54, 97, 57, 51, 49, 101, 45, 56, 97, 53, 100, 45, 52, 100, 52, 52, 45, 97, 101, 50, 48, 45, 53, 99, 51, 53, 52, 56, 57, 52, 98, 97, 53, 49, 34, 44, 34, 99, 104, 97, 116, 78]] from topic [streamingTopic2]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
at [Source: (byte[])"{"id":"5b815ca34faef047a400d42a","status":"SENT","externalPostId":"056a931e-8a5d-4d44-ae20-5c354894ba51","chatName":.......":"[truncated 626 bytes]; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:63) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:10) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.6.jar:2.9.6]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:248) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:224) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar:na]
at org.apache.kafka.clien

post.sent我发送以下消息 <String, Post>哪里 Post是我自己的复杂类型,但我现在不知道如何将其转换为 <String, String>kStream()为了能够在 testListener() 消费它.

请建议如何使其工作。

最佳答案

关于您的使用

return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class)); in order to define the consumerFactory bean



好吧,我不能说您如何将 Produced 数据放入主题,但是 JSON 解析器失败了。
Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
at [Source: (byte[])"{"id":"5b815ca34faef047a400d42a","status":"SENT","externalPostId":"056a931e-8a5d-4d44-ae20-5c354894ba51","chatName":.......":"[truncated 626 bytes]; line: 1, column: 1]
...
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize

基于 Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105 ... ,我会说你在某个时候做了一个 byte[]生产者,而不是使用 StringSerializer 明确定义或 JSONSerializer在生产过程中。

您可以通过使用 new StringDeserializer() 来解决您的错误。甚至不做任何转换 ByteArrayDeserializer在您的 consumerFactory ,但是您仍然需要处理稍后如何将该事件解析为您想要操作和从中提取字段的对象。

关于java - Spring Kafka 和 Kafka Streams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52017927/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com