gpt4 book ai didi

java - Avro 架构 Java 深度复制问题与字段顺序

转载 作者:行者123 更新时间:2023-11-30 05:38:16 26 4
gpt4 key购买 nike

我目前正在研究使用 Java 处理特定 AVRO 模式演变场景时意外行为的解决方案,并在使用者中进行深度复制以将 GenericRecord 类解析为从 AVRO 模式生成的特定类。

为了解释正在发生的事情,我将使用一个简化的架构示例:

{
"name":"SimpleEvent",
"type":"record",
"namespace":"com.simple.schemas",
"fields":[
{
"name":"firstfield",
"type":"string",
"default":""
},
{
"name":"secondfield",
"type":"string",
"default":""
},
{
"name":"thirdfield",
"type":"string",
"default":""
}
]
}

这只是一个包含三个字符串字段的简单模式,所有字段都是可选的,因为它们具有默认值。假设在某个时候我想添加另一个字符串字段,并删除一个字段,因为它不再需要,你最终会得到这样的结果:

{
"name":"SimpleEvent",
"type":"record",
"namespace":"com.simple.schemas",
"fields":[
{
"name":"firstfield",
"type":"string",
"default":""
},
{
"name":"secondfield",
"type":"string",
"default":""
},
{
"name":"newfield",
"type":"string",
"default":""
}
]
}

根据模式演化规则,这不应破坏更改。然而,当生产者开始使用较新的模式生成事件时,下游消费者中会发生一些奇怪的事情。

原来生成的Java类(我使用Gradle avro插件生成类,但是maven插件和avro工具命令行代码生成产生相同的输出)只看字段顺序,而不看字段顺序不根据名称映射字段。

这意味着字段“newfield”的值被使用旧版本架构读取数据的下游消费者映射到“thirdfield”。

我发现了一些工作,其中 manual mapping是根据名称执行的,但是,这不适用于嵌套对象。

通过一些本地实验,我还发现了另一种可以正确解决架构差异的方法:

    Schema readerSchema = SimpleEvent.getClassSchema();
Schema writerSchema = request.getSchema();

if (readerSchema.equals(writerSchema)){
return (SimpleEvent)SpecificData.get().deepCopy(writerSchema, request);
}

DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(writerSchema);
BinaryEncoder encoder = null;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(stream, encoder);

writer.write(request, encoder);
encoder.flush();

byte[] recordBytes = stream.toByteArray();

Decoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null);

SpecificDatumReader<SimpleEvent> specificDatumReader = new SpecificDatumReader(writerSchema, readerSchema);
SimpleEvent result = specificDatumReader.read(null, decoder);
return result;

但是,这似乎是一种相当浪费/不优雅的方法,因为您首先必须将 GenericRecord 转换为 byteArray,然后使用 SpecificDatumReader 再次读取它。

deepcopy 和 datumreader 类之间的区别在于 datumReader 类似乎适应写入器架构与读取器架构不同的场景。

我觉得应该/可以有更好、更优雅的方式来处理这个问题。我真的很感激任何帮助/提示来实现这一目标。

提前致谢:)

奥斯卡

最佳答案

经过更多挖掘和查看我们之前在监听器中使用的 KafkaAvroDeserializer,我注意到 AbstractKafkaAvroDeserializer 有一个反序列化功能,您可以在其中传递阅读器模式。它看起来好得令人难以置信,但它确实有效!

package com.oskar.generic.consumer.demo;

import com.simple.schemas;

import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class SimpleEventDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {

private boolean isKey;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public Object deserialize(String s, byte[] bytes) {
return super.deserialize(bytes, SimpleEvent.getClassSchema());
}

@Override
public void close() {

}
}

然后在消费者工厂中使用它,如下所示:

@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29095");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "one");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SimpleEventDeserializer.class);

return new DefaultKafkaConsumerFactory<>(props);
}

监听器代码本身如下所示:

 @KafkaListener(topics = "my-topic")
public GenericRecord listen(@Payload GenericRecord request, @Headers MessageHeaders headers) throws IOException {
SimpleEvent event = (SimpleEvent) SpecificData.get().deepCopy(request.getSchema(), request);
return request;
}

关于java - Avro 架构 Java 深度复制问题与字段顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56206425/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com