gpt4 book ai didi

java - DataflowRunner 需要 gcpTempLocation,但无法从 PipelineOptions 检索值

转载 作者:太空宇宙 更新时间:2023-11-04 09:32:20 26 4
gpt4 key购买 nike

我正在创建一个演示管道,以使用我的免费 G​​oogle 帐户通过 Dataflow 将 CSV 文件加载到 BigQuery 中。这就是我所面临的。

当我读取 GCS 文件并记录数据时,效果非常好。下面是我的示例代码。

这段代码运行正常

DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("project12345");
options.setStagingLocation("gs://mybucket/staging");
options.setRunner(DataflowRunner.class);
DataflowRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://mybucket/charges.csv")).apply(ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info(c.element());
}

}));

但是,当我添加临时文件夹位置以及我创建的存储桶的路径时,我收到错误,下面是我的代码。


LOG.debug("Starting Pipeline");
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("project12345");
options.setStagingLocation("gs://mybucket/staging");
options.setTempLocation("gs://project12345/temp");
options.setJobName("csvtobq");
options.setRunner(DataflowRunner.class);

DataflowRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);

boolean isStreaming = false;
TableReference tableRef = new TableReference();
tableRef.setProjectId("project12345");
tableRef.setDatasetId("charges_data");
tableRef.setTableId("charges_data_id");

p.apply("Loading Data from GCS", TextIO.read().from("gs://mybucket/charges.csv"))
.apply("Convert to BiqQuery Table Row", ParDo.of(new FormatForBigquery()))
.apply("Write into Data in to Big Query",
BigQueryIO.writeTableRows().to(tableRef).withSchema(FormatForBigquery.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(isStreaming ? BigQueryIO.Write.WriteDisposition.WRITE_APPEND
: BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

p.run().waitUntilFinish();
}

当我运行此程序时,出现以下错误

Exception in thread "main" java.lang.IllegalArgumentException: DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions
at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:242)
at demobigquery.StarterPipeline.main(StarterPipeline.java:74)
Caused by: java.lang.IllegalArgumentException: Error constructing default value for gcpTempLocation: tempLocation is not a valid GCS path, gs://project12345/temp.
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:247)
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:228)
at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:155)
at com.sun.proxy.$Proxy15.getGcpTempLocation(Unknown Source)
at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:240)

这是身份验证问题吗?因为我通过 Eclipse Dataflow 插件使用 JSON 凭据作​​为 GCP 的项目所有者。

任何帮助将不胜感激。

最佳答案

看起来像是从[1]抛出的错误消息。默认的 GCS validator 在[2]中实现。正如您所看到的,Beam 代码还会引发 IllegalArgumentException 异常。因此,您可以进一步检查堆栈以了解 GcsPathValidator 中发生的异常。

[1] https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java#L278

[2] https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java#L29

关于java - DataflowRunner 需要 gcpTempLocation,但无法从 PipelineOptions 检索值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56937534/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com