gpt4 book ai didi

java - 在 Java 中将 protobuf 转换为 bigquery

转载 作者:行者123 更新时间:2023-12-05 04:56:31 32 4
gpt4 key购买 nike

我们将 protobuf 与 GCP 的 pubsub 和数据流结合使用。我们使用单个原型(prototype)文件定义发送到 pubsub 的数据以及 bigquery 模式。

publisher -(send proto)-> pubsub -> dataflow -(write)-> bigquery

有时数据流会进行一些外观上的更改,但主要是将字段从 protobuf 复制到 bigquery。

我的问题是,有没有办法自动将 protobuf 模型转换为 bigquery 的 TableRow?

我们现在拥有的简化数据流代码如下。我想删除 ProtoToTableRow 类中的大部分代码:

public class MyPipeline {
public static void main(String[] args) {
events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));
}
}

// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> {

@ProcessElement
public void processElement(ProcessContext c) {
Core.Foo foo = c.element().getFoo();
TableRow fooRow = new TableRow()
.set("id", foo.getId())
.set("bar", foo.getBar())
.set("baz", foo.getBaz());

// similar code repeated for 100s of lines

TableRow row = new TableRow()
.set("foo", foo)

c.output(row);
}
}

最佳答案

您可以通过非常酷的方式完成此操作。 Beam 为各种类(包括 Java Beans、AutoValue 类以及 Protocol Buffers)提供了模式推理方法。

对于您的管道,您不需要转换为 TableRow,您可以这样做:

pipeline.getSchemaRegistry().registerSchemaProvider(
Core.MyProtoObject.class, new ProtoMessageSchema());

events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));

events.apply("WriteToBigQuery", BigQueryIO.<Core.MyProtoObject>write()
. useBeamSchema()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));

注意 BigQueryIO.write 中的 useBeamSchema 参数 - 这将使用自动转换。

关于java - 在 Java 中将 protobuf 转换为 bigquery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64903586/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com