- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个从 pubsub 获得的对象 PCollection,比方说:
PCollection<Student> pStudent ;
在学生属性中,有一个属性,比如studentID;我想使用此学生 ID 从 BigQuery 读取属性 (class_code),并将从 BQ 获取的 class_code 设置为 PCollcetion 中的学生对象
有人知道如何实现吗?我知道在 Beam 中有一个 BigQueryIO
但如果我想在 BQ 中执行的查询字符串条件来自学生对象 (studentID),我该怎么做呢? PCollection 以及如何将 BigQuery 结果中的值设置为 PCollection?
最佳答案
我考虑了两种选择来做到这一点。一种方法是使用 BigQueryIO 读取整个表并将其具体化为辅助输入,或者使用 CoGroupByKey 来连接所有数据。另一种可能性,即我在此实现的可能性,是直接使用 Java 客户端库。
我创建了一些虚拟数据:
$ bq mk test.students name:STRING,grade:STRING
$ bq query --use_legacy_sql=false 'insert into test.students (name, grade) values ("Yoda", "A+"), ("Leia", "B+"), ("Luke", "C-"), ("Chewbacca", "F")'
看起来像这样:
然后,在管道内,我生成一些输入虚拟数据:
Create.of("Luke", "Leia", "Yoda", "Chewbacca")
对于这些“学生”中的每一位,我都按照 this example 中的方法在 BigQuery 表中获取相应的成绩。 。根据之前的评论,根据您的数据量、速率(配额)和成本考虑因素进行考虑。完整示例:
public class DynamicQueries {
private static final Logger LOG = LoggerFactory.getLogger(DynamicQueries.class);
@SuppressWarnings("serial")
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// create input dummy data
PCollection<String> students = p.apply("Read students data", Create.of("Luke", "Leia", "Yoda", "Chewbacca").withCoder(StringUtf8Coder.of()));
// ParDo to map each student with the grade in BigQuery
PCollection<KV<String, String>> marks = students.apply("Read marks from BigQuery", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"SELECT name, grade "
+ "FROM `PROJECT_ID.test.students` "
+ "WHERE name = "
+ "\"" + c.element() + "\" " // fetch the appropriate student
+ "LIMIT 1")
.setUseLegacySql(false) // Use standard SQL syntax for queries.
.build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId)
TableResult result = queryJob.getQueryResults();
String mark = new String();
for (FieldValueList row : result.iterateAll()) {
mark = row.get("grade").getStringValue();
}
c.output(KV.of(c.element(), mark));
}
}));
// log to check everything is right
marks.apply("Log results", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("Element: " + c.element().getKey() + " " + c.element().getValue());
c.output(c.element());
}
}));
p.run();
}
}
输出是:
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Yoda A+
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Luke C-
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Chewbacca F
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Leia B+
(使用 BigQuery 1.22.0 和 2.5.0 Java SDK for Dataflow 进行测试)
关于google-bigquery - 如何从数据流中的PCollection读取bigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53202450/
我有一个包含数据的表,其中在 A 列中我有一组重复的数据(一个接一个)。 我只想根据 A 列中的值(没有其他条件)选择每个组的第一行。请注意,我还希望为提到的新发现的行选择所有相应的列(我不想排除它们
我有一个包含 340GB 数据的表,但我们只使用了最后一周的数据。因此,为了最小化将这些数据移动到分区表或分片表的成本计划。 我对分片表和分区做了一些实验。我创建了分区表并加载了两天的数据(两个分区)
我想安排将数据从 GCS 存储桶加载到 BigQuery 表。如果我使用 bigquery-transfer 与调度及时的 bigquery-loads 的核心区别是什么? 最佳答案 它们是相同的。
我想安排将数据从 GCS 存储桶加载到 BigQuery 表。如果我使用 bigquery-transfer 与调度及时的 bigquery-loads 的核心区别是什么? 最佳答案 它们是相同的。
我想编写一个 BigQuery 命令行命令来检索 BigQuery 表的最后修改时间。我怎样才能做到这一点? 仅当 BigQuery 表的最后修改日期时间大于某个日期时间时,我才会使用它。 最佳答案
我似乎无法将任何数据从 Socrata 上传到 BigQuery。我收到“加载操作中的 BigQuery 错误:无法连接 BigQuery 服务器。”最初我得到的是 0 错误错误的限制。现在我已将 C
我正在尝试弄清楚是否可以从大查询中导出 hyperloglog 草图并在外部合并它们以进行基数估计。是否有可用的开源库可以轻松解析大型查询草图? 如果不是,是否有任何关于 biq 查询的 hyperl
这是我用作https://cloud.google.com/bigquery/docs/managing-tables#bigquery-copy-table-python的引用的代码: source
构建管道时,源是 BigQueryIO.Read,您会得到一组 TableRow 对象以供使用。 我基本上想对那些 TableRow 对象进行一些小的更改,然后使用 BigQueryIO.Write
BigQuery API Client Libraries 之间有什么区别?和 BigQuery Storage API Client Libraries ? 在 BigQuery Storage R
据我所知,将数据流式传输到 BigQuery 会导致重复行,正如这里提到的 https://cloud.google.com/bigquery/streaming-data-into-bigquery
我在 BigQuery Jobs API 中注意到复制任务: https://developers.google.com/bigquery/docs/reference/v2/jobs#resourc
https://cloud.google.com/bigquery/docs/reference/datatransfer/rest/ 我正在寻找“bigquery 数据传输服务”的 php 客户端库
我正在从 GCS 中的 CSV 文件到 BQ 执行一些 ETL,一切正常,除了日期。我的表中的字段名称是 TEST_TIME,类型是 DATE,所以在 TableRow 中我尝试传递一个 java.u
我已经阅读了 BigQuery 连接器的文档(https://support.google.com/360suite/datastudio/answer/6370296?hl=en)。 我想将自定义查
当两个不同的billing account下有两个project,并且有跨两个project的授权view时,view的查询费用由哪个billing account来计费? 场景:项目 A 包含使用项
所以我有一张购买表: 用户编号 购买时间 数量 我有一张网站上的用户事件表: 用户编号 位置 浏览时间 如何在不超过 purchase_time 的情况下将 purchases 表与 activiti
我有一个 unix 时间戳列,在我的 csv 文件中以毫秒表示。现在,当我将这些数据插入到我的 bigQuery 表中并查询它时,我得到了这个错误 bigQuery not supporting mi
我目前正在将 BigQuery 表提取到 Google Cloud Storage 中的分片 .csv 中——是否有任何方法可以对提取的行进行洗牌/随机化? GCS .csv 将用作 GCMLE 模型
我需要从数据流更新和删除 BigQuery 中的记录。数据来自 Pubsub,并带有标识操作插入、更新、删除 (I、U、D) 的标志。插入不是问题。 有更新和删除的建议吗? 最佳答案 Dataflow
我是一名优秀的程序员,十分优秀!