gpt4 book ai didi

java - Kafka Connect API 和 Avro 对象(SourceRecord 与 org.apache.avro.Schema)

转载 作者:行者123 更新时间:2023-12-01 18:35:35 25 4
gpt4 key购买 nike

我在使用 kafka 源连接器将 Avro 对象(org.apache.avro.specific.SpecificRecord 实例)发送到 kafka 主题时遇到问题(需要准备 SourceRecord 实例)。就我而言,我假设基于模式:

{
"namespace": "com.model.avro.generated",
"type": "record",
"name": " MessageExVal",
"version": "1",
"fields": [
{
"name": "messageSource",
"type": "string"
},
{
"name": "messageSourceVersion",
"type": [
"string",
"null"
]
}
]
}

在 maven 的 avro-maven-plugin 的帮助下,我将生成项目中使用的类的模型。MessageExVal 类的实例为我提供了“org.apache.avro.Schema”(通过方法 getSchema() 或 getClassSchema())。从第二方kafka连接api需要我org.apache.kafka.connect.data.Schema能够创建由poll()返回的SourceRecord的新实例) 源连接器的方法。在配置中我提供参数:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",

在“poll”方法之后执行的方法fromConnectData()中的AvroConverter代码中,我看到从org.apache.kafka.connect.data的转换.Schemaorg.apache.avro.Schema 将完成。那么是否有任何选项可以传递 avro 模式而不首先将其转换为“连接版本”,因为稍后它无论如何都会转换回 avro ?下面您可以找到我所引用的代码中带有注释点的 poll 方法的实现:

@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new LinkedList<SourceRecord>();
MessageExVal myValue = MessageExVal.newBuilder()
.setMessageType(“some value”)
.setMessageSource(“some other value”)
.build();
SourceRecord sr = new SourceRecord(null, null,
"test_topic",
myValue.getSchema(), //incorrect - different types
myValue);
records.add(sr);
return records;
}

总结一切,我的问题是如何使用 kafka connect SourceConnector 将“myValue”放入主题中?我将非常感谢每一个提示:)

最佳答案

because later it is anyway converted back to avro ?

数据以二进制形式存储在主题内,因此您仍然需要支付反序列化成本

kafka connect api requires from me org.apache.kafka.connect.data.Schema to be able to create new instance of SourceRecord

是的。您可以使用 toConnectData 来获取该信息,或者您也可以从代码的依赖项中删除 Avro,并直接从 Connect 创建 Schema 和 Struct 实例。

转换器负责序列化,Connect 中不需要 Avro

关于java - Kafka Connect API 和 Avro 对象(SourceRecord 与 org.apache.avro.Schema),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60058196/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com