- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在玩 Kafka 和流技术;我已经为 KStream 创建了一个自定义序列化器和反序列化器,我将使用它来接收来自给定主题的消息。
现在,问题是我正在以这种方式创建一个 serde:
JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);
序列化器实现:
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T data) {
return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
反序列化器实现:
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String topic, byte[] data) {
System.out.print(data);
if(data == null){
return null;
}
return gson.fromJson(new String(data),deserializedClass);
}
@Override
public void close() {
}
}
当我尝试执行代码时,我收到以下错误:
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.apache.kafka.common.serialization.Serdes$WrapperSerde Does it have a public no-argument constructor?
此处完整转储:https://pastebin.com/WwpuXuxB
这是我尝试使用 serde 的方式:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);
KStream<String, EventMessage> outStream = eventsStream
.mapValues(value -> EventMessage.build(value.type, value.timestamp));
outStream.to("output");
此外,我不完全确定我是否正确设置了全局设置序列化器和反序列化器的属性:
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());
最佳答案
为了完成 Matthias 的回答,我刚刚编写了一个简单示例,说明如何在 Kafka Stream 应用程序中创建自定义 Serde(序列化器/反序列化器)。它可用于克隆和试用:https://github.com/Davidcorral94/Kafka-Streams-Custom-Seder
首先,我创建了两个类,一个用于序列化器,另一个用于反序列化器。在这种情况下,我使用 Gson library执行序列化/反序列化。
public class PersonSerializer implements Closeable, AutoCloseable, Serializer<Person> {
private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, Person person) {
// Transform the Person object to String
String line = gson.toJson(person);
// Return the bytes from the String 'line'
return line.getBytes(CHARSET);
}
@Override
public void close() {
}
}
public class PersonDeserializer implements Closeable, AutoCloseable, Deserializer<Person> {
private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public Person deserialize(String topic, byte[] bytes) {
try {
// Transform the bytes to String
String person = new String(bytes, CHARSET);
// Return the Person object created from the String 'person'
return gson.fromJson(person, Person.class);
} catch (Exception e) {
throw new IllegalArgumentException("Error reading bytes", e);
}
}
@Override
public void close() {
}
}
然后,我将它们都包装到一个 Serde 中,以便能够在我的 Kafka Stream 应用程序中使用它。
public class PersonSerde implements Serde<Person> {
private PersonSerializer serializer = new PersonSerializer();
private PersonDeserializer deserializer = new PersonDeserializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<Person> serializer() {
return serializer;
}
@Override
public Deserializer<Person> deserializer() {
return deserializer;
}
}
最后,您可以通过下一行将此 Serde 类用于您的 Kafka Stream 应用程序:
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerde.class);
这实际上是在使用目前可用的最新 Kafka 版本 1.0.0!
关于java - KafkaStreams serde异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44414784/
我有一个使用the #[serde(default)] container attribute的结构。 但是,应该有一个字段是必需的(如果输入数据中不存在该字段,则反序列化器应该出错,而不是退回到默认
我正在尝试使用 actix-web 服务器作为通往小型堆栈的网关,以保证堆栈内部的数据格式严格,同时为用户提供一些自由。 为此,我想将 JSON 字符串反序列化为结构,然后对其进行验证、再次序列化并将
正在使用 apache-hive-0.13.1。在创建表配置单元时抛出如下错误 FAILED: Execution Error, return code 1 from org.apache.hadoo
我想添加一个简单的版本方案 + 检查我的结构: #[derive(Serialize, Deserialize)] struct Versioned { version: u32, o
我正在尝试使用 Serde 加载一个 Toml 文件,它包含多个 bool 值,如果在文本文件中找不到,我想将它们全部默认为 false。 我目前的实现是: #[derive(serde::Deser
如何使用 Serde 为远程类型创建序列化程序代理对象?这是一个最小的例子(playground): use serde; // 1.0.104 use serde_json; // 1.0.48 s
我有一个枚举: #[derive(Serialize, Deserialize)] enum Action { Join, Leave, } 和一个结构: #[derive(Seria
我有一个枚举: #[derive(Serialize, Deserialize)] enum Action { Join, Leave, } 和一个结构: #[derive(Seria
在 Rust 中,我从 websocket 接收数据。为简单起见,它看起来像这样: [1, {"a": ["1.2345", 5, "9.8765"]}] 我从 websocket 获得的字符串确实
我的程序解析足够大的 json 文档(30MB),在 CPU 较慢的机器上需要 70 毫秒,我想加快这个过程,我发现 27% 的解析发生在我的 foo_document_type_deserializ
我正在尝试在配置单元中创建一个正则表达式 serde 来读取一些日志文件,但是在让它工作时遇到了问题... 日志文件看起来像这样...... 14.196.202.16:9123 11329 2
我正在使用映射列的 JSON-Serde 功能来重命名我的 json 文档中的列 'Customer ID' -> 'customer_id。我使用映射函数的原因是因为 HQL 不允许在 CREATE
我有一个包含 Text 键和 DoubleWritable 值的序列文件。当我将文件加载为外部表时 Create external table t (id String, data Double) S
假设我有这个结构: use serde::{Serialize, Deserialize}; #[derive(Deserialize)] struct MyStruct { field_1:
这段代码完全按照我的意愿序列化了一个 32 字节的数组: #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOr
我正在使用 event_emmiter_rs用于我的应用程序中的事件处理。该库允许您订阅带有回调的事件并触发这些事件。事件采用 (strings, value) 的形式,回调采用接受值参数的闭包形式。
给定这个枚举定义: #[repr(u8)] #[derive(Debug, Serialize)] pub enum AnimalType { #[serde(rename = "cat")]
我目前正在使用 Rocket 开发一个 Web API,它使用以下结构进行错误响应: #[derive(Serialize, Deserialize)] pub struct ErrorRespons
我想解析这个日志样本 May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal M
我想从访问日志中提取(ip、requestUrl、timeStamp)以加载到 hive 数据库。访问日志中的一行如下。 66.249.68.6 - - [14/Jan/2012:06:25:03 -
我是一名优秀的程序员,十分优秀!