gpt4 book ai didi

google-cloud-dataflow - 使用 InProcessPipelineRunner 执行时,PubsubReader 因 NullPointerException 而失败

转载 作者:行者123 更新时间:2023-12-02 03:08:03 30 4
gpt4 key购买 nike

我有一个简单的管道,它只执行读取 PubsubIO.Read.subscription。在消耗了大约 200 个元素后,每次运行都会失败,但出现以下异常:

[error] (run-main-0) java.lang.RuntimeException: java.lang.NullPointerException
java.lang.RuntimeException: java.lang.NullPointerException
at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:281)
at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:296)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.lang.NullPointerException
at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:612)
at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:297)
at com.google.cloud.dataflow.sdk.runners.inprocess.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:203)
at com.google.cloud.dataflow.sdk.runners.inprocess.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishBundle(UnboundedReadEvaluatorFactory.java:172)
at com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutor.finishBundle(TransformExecutor.java:163)
at com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutor.run(TransformExecutor.java:119)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我使用的是 SDK 版本 1.9.0。这在 1.6.1 中没有发生(成功获取 >10k 元素)。有谁知道解决方法?我还注意到 1.9.0 的获取速度比 1.6.1 快得多。对于 1.6.1,它似乎使用 10 个元素的批处理。


测试阅读形式主题:

p.apply(PubsubIO.Read.named("reading_topic_test").topic("projects/***/topics/SL_LogLogin"))

订阅是自动生成的,但是管道失败了:

Jan 05, 2017 2:06:33 PM com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource apply
WARNING: Created subscription null to topic NestedValueProvider{value=NestedValueProvider{value=StaticValueProvider{value=projects/***/topics/SL_LogLogin}}}.
Note this subscription WILL NOT be deleted when the pipeline terminates
[error] (run-main-0) com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException:
Error while populating display data for component:com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn
com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException:
Error while populating display data for component: com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:664)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637)
at com.google.cloud.dataflow.sdk.transforms.ParDo.populateDisplayData(ParDo.java:1266)
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$200(ParDo.java:457)
at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.populateDisplayData(ParDo.java:816)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.forRoot(DisplayData.java:630)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.access$000(DisplayData.java:617)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData.from(DisplayData.java:76)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:47)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.access$100(DisplayDataValidator.java:29)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator$Visitor.visitTransform(DisplayDataValidator.java:62)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:260)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:43)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:35)
at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:245)
at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:303)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.lang.NullPointerException: Input display value cannot be null
at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.addItemIf(DisplayData.java:707)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.add(DisplayData.java:685)
at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn.populateDisplayData(PubsubUnboundedSource.java:1147)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637)
at com.google.cloud.dataflow.sdk.transforms.ParDo.populateDisplayData(ParDo.java:1266)
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$200(ParDo.java:457)
at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.populateDisplayData(ParDo.java:816)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.forRoot(DisplayData.java:630)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.access$000(DisplayData.java:617)
at com.google.cloud.dataflow.sdk.transforms.display.DisplayData.from(DisplayData.java:76)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:47)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.access$100(DisplayDataValidator.java:29)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator$Visitor.visitTransform(DisplayDataValidator.java:62)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:260)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:43)
at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:35)
at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:245)
at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:303)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)

最佳答案

这由 https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/547 解决.很抱歉给您带来麻烦。

根本原因是 incubator-beam 和 GoogleDataflowSDK 之间的反向移植错误,未能初始化在按作业订阅的情况下返回的订阅。

关于google-cloud-dataflow - 使用 InProcessPipelineRunner 执行时,PubsubReader 因 NullPointerException 而失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41465231/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com