gpt4 book ai didi

java - 在 Kafka Streams 中反序列化 POJO

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:46:56 25 4
gpt4 key购买 nike

我的 Kafka 主题有这种格式的消息

user1,subject1,80|user1,subject2,90 

user2,subject1,70|user2,subject2,100

and so on.

我创建了用户 POJO,如下所示。

class User implements Serializable{
/**
*
*/
private static final long serialVersionUID = -253687203767610477L;
private String userId;
private String subject;
private String marks;

public User(String userId, String subject, String marks) {
super();
this.userId = userId;
this.subject = subject;
this.marks = marks;
}

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}
public String getSubject() {
return subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
public String getMarks() {
return marks;
}
public void setMarks(String marks) {
this.marks = marks;
}
}

此外,我还创建了默认键值序列化

streamProperties.put(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamProperties.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

我正在尝试按如下方式按用户 ID 查找计数。我还需要用户对象来执行一些其他功能。

KTable<String, Long> wordCount = streamInput

.flatMap(new KeyValueMapper<String, String, Iterable<KeyValue<String,User>>>() {

@Override
public Iterable<KeyValue<String, User>> apply(String key, String value) {
String[] userObjects = value.split("|");
List<KeyValue<String, User>> userList = new LinkedList<>();
for(String userObject: userObjects) {
String[] userData = userObject.split(",");
userList.add(KeyValue.pair(userData[0],
new User(userData[0],userData[1],userData[2])));


}
return userList;
}
})

.groupByKey()
.count();

我收到以下错误

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.example.testing.dao.User). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我想我需要为用户类提供正确的 Serde

最佳答案

问题出在 Value Serdes。

函数groupBy有两个版本:

  • KStream::KGroupedStream<K, V> groupByKey();
  • KStream::KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);

引擎盖下的第一个版本调用第二个 Grouped使用默认 Serdes(在您的情况下,它用于键和值 StringSerde

你的 flatMap将消息映射到 KeyValue<String, User>类型所以值是 User 类型.

您的解决方案是使用 groupByKey()调用groupByKey(Grouped.with(keySerde, valSerde)); , 具有适当的 Serdes。

关于java - 在 Kafka Streams 中反序列化 POJO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51743011/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com