gpt4 book ai didi

google-bigquery - 如何从数据流中的PCollection读取bigQuery

转载 作者:行者123 更新时间:2023-12-01 15:27:50 25 4
gpt4 key购买 nike

我有一个从 pubsub 获得的对象 PCollection,比方说:

 PCollection<Student> pStudent ;

在学生属性中,有一个属性,比如studentID;我想使用此学生 ID 从 BigQuery 读取属性 (class_code),并将从 BQ 获取的 class_code 设置为 PCollcetion 中的学生对象

有人知道如何实现吗?我知道在 Beam 中有一个 BigQueryIO 但如果我想在 BQ 中执行的查询字符串条件来自学生对象 (studentID),我该怎么做呢? PCollection 以及如何将 BigQuery 结果中的值设置为 PCollection?

最佳答案

我考虑了两种选择来做到这一点。一种方法是使用 BigQueryIO 读取整个表并将其具体化为辅助输入,或者使用 CoGroupByKey 来连接所有数据。另一种可能性,即我在此实现的可能性,是直接使用 Java 客户端库。

我创建了一些虚拟数据:

$ bq mk test.students name:STRING,grade:STRING
$ bq query --use_legacy_sql=false 'insert into test.students (name, grade) values ("Yoda", "A+"), ("Leia", "B+"), ("Luke", "C-"), ("Chewbacca", "F")'

看起来像这样:

enter image description here

然后,在管道内,我生成一些输入虚拟数据:

Create.of("Luke", "Leia", "Yoda", "Chewbacca")

对于这些“学生”中的每一位,我都按照 this example 中的方法在 BigQuery 表中获取相应的成绩。 。根据之前的评论,根据您的数据量、速率(配额)和成本考虑因素进行考虑。完整示例:

public class DynamicQueries {

private static final Logger LOG = LoggerFactory.getLogger(DynamicQueries.class);

@SuppressWarnings("serial")
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);

// create input dummy data
PCollection<String> students = p.apply("Read students data", Create.of("Luke", "Leia", "Yoda", "Chewbacca").withCoder(StringUtf8Coder.of()));

// ParDo to map each student with the grade in BigQuery
PCollection<KV<String, String>> marks = students.apply("Read marks from BigQuery", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"SELECT name, grade "
+ "FROM `PROJECT_ID.test.students` "
+ "WHERE name = "
+ "\"" + c.element() + "\" " // fetch the appropriate student
+ "LIMIT 1")
.setUseLegacySql(false) // Use standard SQL syntax for queries.
.build();

// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

// Wait for the query to complete.
queryJob = queryJob.waitFor();

// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getError().toString());
}

// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId)
TableResult result = queryJob.getQueryResults();

String mark = new String();

for (FieldValueList row : result.iterateAll()) {
mark = row.get("grade").getStringValue();
}

c.output(KV.of(c.element(), mark));
}
}));

// log to check everything is right
marks.apply("Log results", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("Element: " + c.element().getKey() + " " + c.element().getValue());
c.output(c.element());
}
}));

p.run();
}
}

输出是:

Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Yoda A+
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Luke C-
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Chewbacca F
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Leia B+

(使用 BigQuery 1.22.0 和 2.5.0 Java SDK for Dataflow 进行测试)

关于google-bigquery - 如何从数据流中的PCollection读取bigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53202450/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com