gpt4 book ai didi

java - 使用 TextIO 和 ValueProvider 创建数据流模板时出错

转载 作者:行者123 更新时间:2023-11-30 02:10:13 28 4
gpt4 key购买 nike

我正在尝试创建一个谷歌数据流模板,但我似乎找不到一种方法来做到这一点而不产生以下异常:

WARNING: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=null}
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=inputFile, default=null}
at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:234)
at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:218)
at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.examples.MyMinimalWordCount.main(MyMinimalWordCount.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)

我可以使用 Beam 的 MinimalWordCount 示例的简单修改版本来重现它。

public class MyMinimalWordCount {

public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
ValueProvider<String> getInputFile();

void setInputFile(ValueProvider<String> valueProvider);
}

public static void main(String[] args) {

WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from(options.getInputFile()))

.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.perElement())
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.write().to("wordcounts"));

// Having the waitUntilFinish causes a NPE when trying to create a dataflow template
//p.run().waitUntilFinish();

p.run();
}
}

我可以使用以下命令在本地运行该示例:

mvn compile exec:java \
-Pdirect-runner \
-Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
-Dexec.args="--inputFile=pom.xml "

它还可以在 Google Dataflow 上运行:

mvn compile exec:java \
-Pdataflow-runner \
-Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
-Dexec.args="--runner=DataflowRunner \
--project=[project] \
--inputFile=gs://[bucket]/input.csv "

但是当我尝试使用以下内容创建 Google Dataflow 模板时,出现错误:

mvn compile exec:java \
-Pdataflow-runner \
-Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
-Dexec.args="--runner=DataflowRunner \
--project=[project] \
--stagingLocation=gs://[bucket]/staging \
--templateLocation=gs://[bucket]/templates/MyMinimalWordCountTemplate "

另一个令人困惑的事情是 Maven 构建继续并以 BUILD SUCCESS 结束

所以我的问题是:

Q1) 我是否应该能够创建这样的 Google Dataflow 模板(使用 ValueProvider 在运行时提供 TextIO 输入)?

Q2) 构建过程中的异常是真正的错误还是只是一个警告(如日志记录所示)?

Q3) 如果 Q1 和 Q2 的答案是"is"并且“只是警告”,并且我尝试从上传的模板创建作业,为什么它没有任何元数据或不知道我的输入选项?

Google Dataflow screenshot

我使用过的引用资料:

最佳答案

正确的答案是您不必在制作模板时提供输入,并且应该在运行时将输入作为值。该异常(exception)是 Google 数据流的内部问题,将来应将其删除。

关于java - 使用 TextIO 和 ValueProvider 创建数据流模板时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50344150/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com