gpt4 book ai didi

java - 从对象构建新的 SourceRecord

转载 作者:行者123 更新时间:2023-11-30 05:36:43 27 4
gpt4 key购买 nike

我正在编写一个 Kafka 连接器,以便从 Github 上的多个源(文本和 yaml 文件)下载一些数据,并将它们转换为某个类的对象,该对象是从 avsc 文件自动生成的:

{
"type": "record",
"name": "MatomoRecord",
"fields": [
{"name": "name", "type": "string"},
{"name": "type", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}

到目前为止一切都很成功。现在我有一个对象映射,我想将其保留在 Kafka 主题中。为此,我尝试创建 SourceRecords:

for (Map.Entry<String, MatomoRecord> record : records.entrySet()) {
sourceRecords.add(new SourceRecord(
sourcePartition,
sourceOffset,
matomoTopic,
0,
org.apache.kafka.connect.data.Schema.STRING_SCHEMA,
record.getKey(),
matomoSchema,
record.getValue())
);
}

如何基于 avro 模式定义 org.apache.kafka.connect.data.Schema 类型的值模式?为了进行测试,我使用生成器手动创建了一个架构:

Schema matomoSchema = SchemaBuilder.struct()
.name("MatomoRecord")
.field("name", Schema.STRING_SCHEMA)
.field("type", Schema.STRING_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA)
.build();

结果是:

org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class MatomoRecord

有人可以帮我定义基于 avro 架构的值架构吗?

最诚挚的问候马丁

最佳答案

您不能使用record.getValue(),也没有从 Avro 到 Connect Schema 的直接 API(没有 Confluence 的 AvroConverter 的内部方法)

您需要将该对象解析为与您定义的架构匹配的 Struct 对象(假设您的对象字段都不能为空,这看起来很好)

查看 Javadoc 了解如何定义它 https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html

注意(此处不相关),嵌套结构应从“自下而上”构建,您将子结构/数组放入父结构/数组中。

除了包含您的模型对象之外,您的连接器不一定依赖于 Avro。 Converter 接口(interface)负责将您的 Struct 及其 Schema 转换为其他数据格式(JSON、Confluence 的 Avro 编码、Protobuf 等)

关于java - 从对象构建新的 SourceRecord,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56441118/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com