gpt4 book ai didi

serialization - 如何将 AVRO 序列化器与 Kafka Connect SourceTask 中的架构注册表结合使用

转载 作者:行者123 更新时间:2023-12-01 19:35:47 24 4
gpt4 key购买 nike

我已经设置了 Confluence 数据平台并开始开发 SourceConnector,并在相应的 SourceTask.poll() 方法中执行以下操作(下面是伪 Java 代码):

    public List<SourceRecord> poll() throws InterruptedException {

....

Envelope envelope = new Envelope();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder enc = EncoderFactory.get().binaryEncoder(out, null);
DatumWriter<Envelope> dw = new ReflectDatumWriter<Envelope>(Envelope.class);
dw.write((Envelope)envelope, enc);
enc.flush();
out.close();
Map<String, String> sourcePartition = new HashMap<String, String>();
sourcePartition.put("stream", streamName);
Map<String, Integer> sourceOffset = new HashMap<String, Integer>();
sourceOffset.put("position", Integer.parseInt(envelope.getTimestamp()));
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, org.apache.kafka.connect.data.Schema.BYTES_SCHEMA, envelope));

....

我想使用模式注册表,以便使用注册表中的模式 id 标记正在序列化的对象,序列化,然后通过 poll() 函数发布到 Kafka 主题。如果任意对象的模式不驻留在注册表中,我希望它被注册,并将相应的生成 ID 返回到序列化程序进程,以便它成为序列化对象的一部分,使其可反序列化。

我需要在上面的代码中做什么才能实现这一点?

最佳答案

要使用 SchemaRegistry,您必须使用 Confluence 提供的类序列化/反序列化数据:

  • io.confluence.kafka.serializers.KafkaAvroSerializer
  • io.confluence.kafka.serializers.KafkaAvroDeserializer

这些类包含从注册表注册和请求架构的所有逻辑。

如果您使用maven,您可以添加此依赖项:

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>2.0.1</version>
</dependency>

关于serialization - 如何将 AVRO 序列化器与 Kafka Connect SourceTask 中的架构注册表结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37317567/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com