- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我最近将现有管道从数据流 1.x 升级到数据流 2.x,我看到一个对我来说没有意义的错误。我会将相关代码放在下面,然后包括我看到的错误。
// This is essentially the final step in our pipeline, where we write
// one of the side outputs from the pipeline to a BigQuery table
results.get(matchedTag)
.apply("CountBackfill", Count.<String>perElement())
.apply("ToReportRow", ParDo.of(new ToReportRow()))
// at this point, there is now a PCollection<TableRow>
.apply("WriteReport", BigQueryIO.writeTableRows()
.to(reportingDataset + ".AttributeBackfill_" + dayStr)
.withSchema(ReportSchema.get())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
/*
* Create a TableRow from a key/value pair
*/
public static class ToReportRow extends DoFn<KV<String, Long>, TableRow> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
KV<String, Long> row = c.element();
c.output(new TableRow()
.set(ReportSchema.ID, row.getKey())
.set(ReportSchema.COUNT, row.getValue()));
}
}
这是我看到的错误:
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1426) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:989) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:525) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:297) at com.prod.merge.DailyUniqueProfiles.buildPipeline(DUP.java:106) at com.prod.merge.MergePipeline.main(MergePipeline.java:91)
行 .apply("WriteReport", BigQueryIO.writeTableRows()
是 DUP.java
中的第 106 行,所以我怀疑这是错误的行。
关于问题可能是什么的任何想法?
最佳答案
这个问题的解决方案最终出现在 maven 依赖项中。添加以下依赖项并使用 mvn
重新编译后,错误消失了。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
关于java - Apache Beam,BigQueryIO.WriteTableRows() 上的 NoSuchMethodError?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48067265/
我想做的是读取一个现有表并生成一个新表,该表具有与原始表相同的架构以及一些额外的列(从原始表的某些列计算得出)。可以在不通知我的情况下增加原始表架构(我在数据流作业中使用的字段不会更改),因此我希望始
我已经研究数据流/bigquery有一段时间了,但我仍然无法理解一些基本的事情,即何时使用某种类型的方法来查询表。 BigQueryIO.Read 的单行查询选项是: + 简短, + 适合大型结果,在
我有一个从 BigQuery 表读取数据的数据流管道。但是,在读取数据时,除了使用 read(SerializableFunction) 读取所有记录外别无选择。或 readTableRows()方法
我在查询位于欧盟的 BigQuery 表/数据集时遇到问题。此处报告了该问题:https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues
我的 Beam 管道中有一个 BigQueryIO.Write 阶段,它是通过调用 .withJsonSchema(String) 构建的: inputStream.apply( "save-
我想从 Cloud Pub/Sub 读取数据并使用 Cloud Dataflow 将其写入 BigQuery。每个数据都包含一个表 ID,数据本身将保存在其中。 写入 BigQuery 失败的因素有多
我注意到的一件事是 BigQueryIO.read().fromQuery() 的性能比 Apache Beam 中的 BigQueryIO.read().from() 的性能要慢得多。为什么会发生这
我的流数据流管道从 PubSub 提取数据,不会写入 BigQuery,也不会记录任何错误。这些元素进入节点“Write to BigQuery/StreamingInserts/StreamingW
我的管道:Kafka -> 数据流流 (Beam v2.3) -> BigQuery 鉴于低延迟对我来说并不重要,我使用 FILE_LOADS降低成本,像这样: BigQueryIO.writeTab
我最近将现有管道从数据流 1.x 升级到数据流 2.x,我看到一个对我来说没有意义的错误。我会将相关代码放在下面,然后包括我看到的错误。 // This is essentially the fina
这在文档中并不清楚,但看起来像 BigQueryIO.write执行流式写入,这又是limits the row size to 20KB)记录,我们在Stack Overflow上找到一个类似的问题
Apache Beam 2.1.0 存在从 BigQuery 读取的模板管道的错误,这意味着它们只能执行一次。更多详情请点击 https://issues.apache.org/jira/browse
我收到以下控制台日志,并且进程停止 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions INFO: Pip
我们使用以下代码将记录写入 BigQuery: BigQueryIO.writeTableRows() .to("table") .withCreateDisposition(BigQ
由此我明白了thread使用“.fromQuery”比“.from”更昂贵且更慢,但是如果我需要从多个表中检索数据该怎么办? 目前我正在使用“INNER JOIN”查询来执行此操作,但如何使用“.fr
当您需要在数据流作业中从 bigquery 中的一个或多个表中读取所有数据时,我会说有两种方法。第一种方法是将 BigQueryIO 与 from 一起使用,它读取有问题的表,第二种方法是使用 fro
有没有办法将侧输入应用于 Apache Beam 中的 BigQueryIO.read() 操作。 举例来说,我在 PCollection 中有一个值,我想在查询中使用它来从 BigQuery 表中获
Apache 光束2.9.0 我已经建立了一个管道,从 BigQuery 中提取数据并对其进行一系列转换。这些选项使用 ValueProvider 附加了开始日期。 : ValueProvider g
是否有人与我遇到同样的问题,即 Google Cloud Dataflow BigQueryIO.Write 发生未知错误(http 代码 500)? 我使用Dataflow处理四月、五月、六月的一些
我的工作流程:KAFKA -> 数据流流 -> BigQuery 鉴于低延迟对我来说并不重要,我使用 FILE_LOADS 来降低成本。我将 BigQueryIO.Write 与 DynamicDes
我是一名优秀的程序员,十分优秀!