gpt4 book ai didi

java - KafkaStreams serde异常

转载 作者:搜寻专家 更新时间:2023-11-01 01:20:24 25 4
gpt4 key购买 nike

我正在玩 Kafka 和流技术;我已经为 KStream 创建了一个自定义序列化器和反序列化器,我将使用它来接收来自给定主题的消息。

现在,问题是我正在以这种方式创建一个 serde:

JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);

序列化器实现:

public class JsonSerializer<T> implements Serializer<T> {

private Gson gson = new Gson();

public void configure(Map<String, ?> map, boolean b) {
}

@Override
public byte[] serialize(String topic, T data) {
return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
}

@Override
public void close() {

}
}

反序列化器实现:

public class JsonDeserializer<T> implements Deserializer<T> {

private Gson gson = new Gson();
private Class<T> deserializedClass;

public JsonDeserializer() {

}

public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}

@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}

@Override
public T deserialize(String topic, byte[] data) {
System.out.print(data);
if(data == null){
return null;
}

return gson.fromJson(new String(data),deserializedClass);
}

@Override
public void close() {

}
}

当我尝试执行代码时,我收到以下错误:

Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.apache.kafka.common.serialization.Serdes$WrapperSerde Does it have a public no-argument constructor?

此处完整转储:https://pastebin.com/WwpuXuxB

这是我尝试使用 serde 的方式:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);

KStream<String, EventMessage> outStream = eventsStream
.mapValues(value -> EventMessage.build(value.type, value.timestamp));

outStream.to("output");

此外,我不完全确定我是否正确设置了全局设置序列化器和反序列化器的属性:

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());

最佳答案

为了完成 Matthias 的回答,我刚刚编写了一个简单示例,说明如何在 Kafka Stream 应用程序中创建自定义 Serde(序列化器/反序列化器)。它可用于克隆和试用:https://github.com/Davidcorral94/Kafka-Streams-Custom-Seder

首先,我创建了两个类,一个用于序列化器,另一个用于反序列化器。在这种情况下,我使用 Gson library执行序列化/反序列化。

序列化器

public class PersonSerializer implements Closeable, AutoCloseable, Serializer<Person> {

private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();

@Override
public void configure(Map<String, ?> map, boolean b) {
}

@Override
public byte[] serialize(String s, Person person) {
// Transform the Person object to String
String line = gson.toJson(person);
// Return the bytes from the String 'line'
return line.getBytes(CHARSET);
}

@Override
public void close() {

}
}

反序列化器

public class PersonDeserializer implements Closeable, AutoCloseable, Deserializer<Person> {

private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();

@Override
public void configure(Map<String, ?> map, boolean b) {
}

@Override
public Person deserialize(String topic, byte[] bytes) {
try {
// Transform the bytes to String
String person = new String(bytes, CHARSET);
// Return the Person object created from the String 'person'
return gson.fromJson(person, Person.class);
} catch (Exception e) {
throw new IllegalArgumentException("Error reading bytes", e);
}
}

@Override
public void close() {

}
}

然后,我将它们都包装到一个 Serde 中,以便能够在我的 Kafka Stream 应用程序中使用它。

服务

public class PersonSerde implements Serde<Person> {
private PersonSerializer serializer = new PersonSerializer();
private PersonDeserializer deserializer = new PersonDeserializer();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}

@Override
public void close() {
serializer.close();
deserializer.close();
}

@Override
public Serializer<Person> serializer() {
return serializer;
}

@Override
public Deserializer<Person> deserializer() {
return deserializer;
}
}

最后,您可以通过下一行将此 Serde 类用于您的 Kafka Stream 应用程序:

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerde.class);

这实际上是在使用目前可用的最新 Kafka 版本 1.0.0!

关于java - KafkaStreams serde异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44414784/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com