gpt4 book ai didi

google-cloud-dataflow - 将侧输入应用于 Apache Beam 中的 BigQueryIO.read 操作

转载 作者:行者123 更新时间:2023-12-01 11:19:45 24 4
gpt4 key购买 nike

有没有办法将侧输入应用于 Apache Beam 中的 BigQueryIO.read() 操作。

举例来说,我在 PCollection 中有一个值,我想在查询中使用它来从 BigQuery 表中获取数据。这可以使用侧面输入吗?或者在这种情况下应该使用其他东西吗?

我在类似的情况下使用了 NestedValueProvider,但我想我们只能在某个值取决于我的运行时值时使用它。或者我可以在这里使用相同的东西吗?如果我错了,请纠正我。

我试过的代码:

Bigquery bigQueryClient = start_pipeline.newBigQueryClient(options.as(BigQueryOptions.class)).build();
Tabledata tableRequest = bigQueryClient.tabledata();

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new DoFn<String,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
List<TableRow> list = c.sideInput(bqDataView);
String tableName = list.get(0).get("table").toString();
TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

for(TableRow row:table.getRows())
{
c.output(row);
}
}
}).withSideInputs(bqDataView));

我得到的错误是:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize BeamTest.StarterPipeline$1@86b455
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
at BeamTest.StarterPipeline.main(StarterPipeline.java:158)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.Bigquery$Tabledata
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 4 more

最佳答案

Beam 模型目前不能很好地支持这种依赖于数据的操作。

一种方法是编写自己的代码 DoFn接收侧输入并直接连接到 BQ。不幸的是,这不会给你任何并行性,因为 DoFn将完全在同一线程上运行。

一次可拆分 DoFn Beam 支持 s,这将是一个不同的故事。

在当前的世界状态下,您需要使用 BQ client library添加查询 BQ 的代码,就像您不在 Beam 管道中一样。

鉴于您问题中的代码,关于如何实现这一点的粗略想法如下:

class ReadDataDoFn extends DoFn<String,TableRow>(){
private Tabledata tableRequest;

private Bigquery bigQueryClient;

private Bigquery createBigQueryClientWithinDoFn() {
// I'm not sure how you'd implement this, but you had the right idea
}

@Setup
public void setup() {
bigQueryClient = createBigQueryClientWithinDoFn();
tableRequest = bigQueryClient.tabledata();
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
List<TableRow> list = c.sideInput(bqDataView);
String tableName = list.get(0).get("table").toString();
TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

for(TableRow row:table.getRows())
{
c.output(row);
}
}
}

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new ReadDataDoFn()));

关于google-cloud-dataflow - 将侧输入应用于 Apache Beam 中的 BigQueryIO.read 操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45419994/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com