gpt4 book ai didi

scala - 区分 AVRO 联合类型

转载 作者:行者123 更新时间:2023-12-01 08:23:25 32 4
gpt4 key购买 nike

我正在使用“自动”反序列化器从 Kafka 消费 Avro 序列化消息,例如:

props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer"
);
props.put("schema.registry.url", "https://example.com");

这非常有效,并且在 https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer 的文档之外.

我面临的问题是我实际上只想转发这些消息,但要进行路由我需要一些来自内部的元数据。一些技术限制意味着我无法切实可行地编译生成的类文件以使用 KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG => true,因此我使用的是常规解码器而没有绑定(bind)到 Kafka,具体来说就是阅读字节作为 Array[Byte] 并将它们传递给手动构造的反序列化器:

var maxSchemasToCache = 1000;
var schemaRegistryURL = "https://example.com/"
var specificDeserializerProps = Map(
"schema.registry.url"
-> schemaRegistryURL,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
-> "false"
);
var client = new CachedSchemaRegistryClient(
schemaRegistryURL,
maxSchemasToCache
);
var deserializer = new KafkaAvroDeserializer(
client,
specificDeserializerProps.asJava
);

消息是一种“容器”类型,真正有趣的部分是 union { A, B, C } msg 记录字段中的大约 25 种类型:

record Event {
timestamp_ms created_at;
union {
Online,
Offline,
Available,
Unavailable,
...
...Failed,
...Updated
} msg;
}

所以我成功地将 Array[Byte] 读入 record 并将其送入反序列化器,如下所示:

var genericRecord = deserializer.deserialize(topic, consumerRecord.value())
.asInstanceOf[GenericRecord];
var schema = genericRecord.getSchema();
var msgSchema = schema.getField("msg").schema();

然而,问题是我无法通过联合来辨别、区分或“解析”msg 字段的“类型”:

System.out.printf(
"msg.schema = %s msg.schema.getType = %s\n",
msgSchema.getFullName(),
msgSchema.getType().name());
=> msg.schema = union msg.schema.getType = union

这种场景下如何区分类型?合流注册表知道,这些东西有名称,它们有“类型”,即使我将它们视为 GenericRecords

我的目标是知道record.msg 是“type” Online |离线 |可用而不只是知道它是一个union

最佳答案

在研究了 AVRO Java 库的实现之后,可以肯定地说,鉴于当前的 API,这是不可能的。我发现了以下在解析时提取类型的方法,使用自定义 GenericDatumReader 子类,但在我在生产代码中使用类似这样的东西之前需要大量改进 :D

所以这是子类:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.ResolvingDecoder;

import java.io.IOException;
import java.util.List;

public class CustomReader<D> extends GenericDatumReader<D> {
private final GenericData data;
private Schema actual;
private Schema expected;

private ResolvingDecoder creatorResolver = null;
private final Thread creator;
private List<Schema> unionTypes;

// vvv This is the constructor I've modified, added a list of types
public CustomReader(Schema schema, List<Schema> unionTypes) {
this(schema, schema, GenericData.get());
this.unionTypes = unionTypes;
}

public CustomReader(Schema writer, Schema reader, GenericData data) {
this(data);
this.actual = writer;
this.expected = reader;
}

protected CustomReader(GenericData data) {
this.data = data;
this.creator = Thread.currentThread();
}

protected Object readWithoutConversion(Object old, Schema expected, ResolvingDecoder in) throws IOException {
switch (expected.getType()) {
case RECORD:
return super.readRecord(old, expected, in);
case ENUM:
return super.readEnum(expected, in);
case ARRAY:
return super.readArray(old, expected, in);
case MAP:
return super.readMap(old, expected, in);
case UNION:
// vvv The magic happens here
Schema type = expected.getTypes().get(in.readIndex());
unionTypes.add(type);
return super.read(old, type, in);
case FIXED:
return super.readFixed(old, expected, in);
case STRING:
return super.readString(old, expected, in);
case BYTES:
return super.readBytes(old, expected, in);
case INT:
return super.readInt(old, expected, in);
case LONG:
return in.readLong();
case FLOAT:
return in.readFloat();
case DOUBLE:
return in.readDouble();
case BOOLEAN:
return in.readBoolean();
case NULL:
in.readNull();
return null;
default:
return super.readWithoutConversion(old, expected, in);
}
}
}

我已经为有趣的部分在代码中添加了注释,因为它主要是样板文件。

然后你可以像这样使用这个自定义阅读器:

        List<Schema> unionTypes = new ArrayList<>();
DatumReader<GenericRecord> datumReader = new CustomReader<GenericRecord>(schema, unionTypes);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(eventFile, datumReader);
GenericRecord event = null;

while (dataFileReader.hasNext()) {
event = dataFileReader.next(event);
}

System.out.println(unionTypes);

这将为解析的每个 union 打印该 union 的类型。请注意,您必须根据记录中有多少个联合等因素确定该列表中的哪个元素对您感兴趣。

不太漂亮 tbh :D

关于scala - 区分 AVRO 联合类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59838118/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com