gpt4 book ai didi

google-cloud-dataflow - 用于 GenericRecord 的 Apache Beam 编码器

转载 作者:行者123 更新时间:2023-12-04 15:50:15 24 4
gpt4 key购买 nike

我正在构建一个读取 Avro 通用记录的管道。要在阶段之间传递 GenericRecord,我需要注册 AvroCoder。文档说如果我使用通用记录,架构参数可以是任意的:https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of-java.lang.Class-org.apache.avro.Schema-

但是,当我将一个空模式传递给方法 AvroCoder.of(Class, Schema)它在运行时引发异常。有没有办法为不需要模式的 GenericRecord 创建 AvroCoder?就我而言,每个 GenericRecord 都有一个嵌入式模式。

异常和堆栈跟踪:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)

最佳答案

我有一个类似的案例并使用自定义编码器解决了它。最简单(但效率较低)的解决方案是将模式与每条记录一起编码。如果您的模式不是太不稳定,您可以从缓存中受益。

public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
public static GenericRecordCoder of() {
return new GenericRecordCoder();
}
private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();

@Override
public void encode(GenericRecord value, OutputStream outStream) throws IOException {
String schemaString = value.getSchema().toString();
String schemaHash = getHash(schemaString);
StringUtf8Coder.of().encode(schemaString, outStream);
StringUtf8Coder.of().encode(schemaHash, outStream);
AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash,
s -> AvroCoder.of(value.getSchema()));
coder.encode(value, outStream);
}

@Override
public GenericRecord decode(InputStream inStream) throws IOException {
String schemaString = StringUtf8Coder.of().decode(inStream);
String schemaHash = StringUtf8Coder.of().decode(inStream);
AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash,
s -> AvroCoder.of(new Schema.Parser().parse(schemaString)));
return coder.decode(inStream);
}
}

虽然这解决了任务,但实际上我使用外部模式注册表做了一些不同的事情(例如,您可以在数据存储的顶部构建它)。在这种情况下,您不需要序列化/反序列化模式。代码如下所示:
public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
public static GenericRecordCoder of() {
return new GenericRecordCoder();
}
private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();

@Override
public void encode(GenericRecord value, OutputStream outStream) throws IOException {
SchemaRegistry.registerIfAbsent(value.getSchema());
String schemaName = value.getSchema().getFullName();
StringUtf8Coder.of().encode(schemaName, outStream);
AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName,
s -> AvroCoder.of(value.getSchema()));
coder.encode(value, outStream);
}

@Override
public GenericRecord decode(InputStream inStream) throws IOException {
String schemaName = StringUtf8Coder.of().decode(inStream);
AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName,
s -> AvroCoder.of(SchemaRegistry.get(schemaName)));
return coder.decode(inStream);
}
}

用法非常简单:
PCollection<GenericRecord> inputCollection = pipeline
.apply(AvroIO
.parseGenericRecords(t -> t)
.withCoder(GenericRecordCoder.of())
.from(...));

关于google-cloud-dataflow - 用于 GenericRecord 的 Apache Beam 编码器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53764518/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com