gpt4 book ai didi

serialization - Kafka 对象序列化

转载 作者:行者123 更新时间:2023-12-04 15:25:14 25 4
gpt4 key购买 nike

这个问题在这里已经有了答案:





Writing Custom Kafka Serializer

(3 个回答)


去年关闭。




我开始玩卡夫卡。我已经设置了一个 zookeeper 配置,并且我设法发送和使用 String 消息。
现在我正在尝试传递一个对象(在 Java 中),但是由于某种原因,在解析消费者中的 Message 时,我遇到了 header 问题。我尝试了几个序列化选项(使用解码器/编码器),并且所有的都返回相同的 header 问题。

这是我的代码
制作人:

        Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer");
ProducerConfig config = new ProducerConfig(props);
Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config);
ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails());
try {
producer.send(data);
} finally {
producer.close();
}

而消费者:
        Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");

// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer());
List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3");

// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);

// consume the messages in the threads
for (final KafkaMessageStream<EventDetails> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(EventDetails event: stream) {
System.err.println("********** Got message" + event.toString());
}
}
});
}

和我的序列化程序:
public  class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> {
public Message toMessage(EventDetails eventDetails) {
try {
ObjectMapper mapper = new ObjectMapper(new SmileFactory());
byte[] serialized = mapper.writeValueAsBytes(eventDetails);
return new Message(serialized);
} catch (IOException e) {
e.printStackTrace();
return null; // TODO
}
}
public EventDetails toEvent(Message message) {
EventDetails event = new EventDetails();

ObjectMapper mapper = new ObjectMapper(new SmileFactory());
try {
//TODO handle error
return mapper.readValue(message.payload().array(), EventDetails.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}

}
}

这是我得到的错误:
org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse
at [Source: N/A; line: -1, column: -1]

当我与 MessagePack 一起工作时并以简单的方式写入 ObjectOutputStream我有一个类似的标题问题。我还尝试将有效负载 CRC32 添加到消息中,但这也无济于事。

我在这里做错了什么?

最佳答案

嗯,我没有遇到与您遇到的相同的头文件问题,但是当我没有提供 VerifiableProperties 时,我的项目没有正确编译。我的编码器/解码器中的构造函数。不过,缺少的构造函数会破坏 jackson 的反序列化,这似乎很奇怪。

也许尝试拆分编码器和解码器并包含 VerifiableProperties两者的构造函数;你不应该需要实现 Decoder[T]用于序列化。我能够使用 ObjectMapper 成功实现 json 反/序列化遵循 this post 中的格式.

祝你好运!

关于serialization - Kafka 对象序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11492741/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com