gpt4 book ai didi

java - 编写自定义 Kafka 序列化器

转载 作者:IT老高 更新时间:2023-10-28 20:49:37 25 4
gpt4 key购买 nike

我在 Kafka 消息中使用我自己的类,它有一堆字符串数据类型。

因此,我不能使用默认的序列化程序类或 Kafka 库附带的 StringSerializer

我想我需要编写自己的序列化程序并将其提供给生产者属性?

最佳答案

编辑

在较新的 Kafka 客户端中,实现 Serializer 而不是 Encoder


编写自定义序列化程序所需的东西是:

  1. 使用为泛型指定的对象实现 Encoder
    • 需要提供 VerifiableProperties 构造函数
  2. 重写 toBytes(...) 方法确保返回一个字节数组
  3. 将序列化器类注入(inject)ProducerConfig

为生产者声明自定义序列化程序

正如您在问题中提到的,Kafka 提供了一种为生产者声明特定序列化程序的方法。序列化程序类设置在 ProducerConfig 实例中,该实例用于构造所需的 Producer 类。

如果您关注 Kafka's Producer Example您将通过 Properties 对象构造 ProducerConfig。在构建属性文件时,请务必包含:

props.put("serializer.class", "path.to.your.CustomSerializer");

包含您希望 Kafka 在将消息附加到日志之前用来序列化消息的类的路径。

创建 Kafka 理解的自定义序列化程序

编写 Kafka 可以正确解释的自定义序列化程序需要实现 Kafka 提供的 Encoder[T] scala 类。 Implementing traits in java is weird ,但以下方法适用于在我的项目中序列化 JSON:

public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();

public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}

@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}

您的问题听起来好像您正在使用一个对象(我们称之为 CustomMessage)来处理附加到日志的所有消息。如果是这种情况,您的序列化程序可能看起来更像这样:

package com.project.serializer;

public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}

@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}

这将使您的属性配置看起来像这样:

props.put("serializer.class", "path.to.your.CustomSerializer");

关于java - 编写自定义 Kafka 序列化器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23755976/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com