gpt4 book ai didi

java - 使用 Apache Flink 1.10 的 Azure Blob 存储

转载 作者:行者123 更新时间:2023-12-02 08:43:51 25 4
gpt4 key购买 nike

我正在尝试将 Azure Blob 存储与 Apache Flink 1.10 结合使用来实现检查点目的。

我遵循了 [Flink 文档][1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/azure.html 中提到的所有说明

第 1 步:

mkdir ./plugins/azure-fs-hadoop
cp ./opt/flink-azure-fs-hadoop-1.10.0.jar ./plugins/azure-fs-hadoop/

第 2 步:

这是我在flink-conf.xml中的内容

#Azure Storage Key
**fs.azure.account.key.<storage-account>.blob.core.windows.net:xxxxxxxxxxxxxxxxxx**

第 3 步:

使用 Azure Blob 存储进行检查点

这就是我在 flink 工作中所拥有的

final StateBackend stateBackend = new FsStateBackend("wasb://flink-blob@$<storage-account>.blob.core.windows.net/checkpoint");

我不确定我是否错过了这里的任何内容,但是当我提交作业时,我得到了以下异常(来自 Actor 的 AskTimeoutException)。

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at com.example.flink.checkpointing.CheckpointExample.main(CheckpointExample.java:78)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1736)
... 17 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#75666936]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
at java.base/java.lang.Thread.run(Thread.java:834)

End of exception on server side>]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
... 4 more```

最佳答案

我认为你的问题是双重的。由于 AskTimeoutException,真正的失败原因被隐藏。此问题已通过 FLINK-16018 解决它将与 Flink 1.10.1 一起发布。问题是超时值太激进,以至于客户端的持久作业提交将失败。

对于真正的失败原因,我建议查看Flink的jobmanager.log。它应该包含有关出错原因的信息。我怀疑 Azure Blob 存储配置错误。

关于java - 使用 Apache Flink 1.10 的 Azure Blob 存储,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61211809/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com