gpt4 book ai didi

apache-flink - Flink 外化检查点的两个问题

转载 作者:行者123 更新时间:2023-12-03 11:11:34 24 4
gpt4 key购买 nike

我有两个关于 Flink 外部化检查点的问题

(Q1) 我可以在 flink-conf.yaml 中设置“state.checkpoints.dir”来让外部检查点正常工作,但是当我从 IDE 运行 flink 时如何实现同样的事情?我尝试了 ( http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html ) 中提到的 GlobalConfiguration 方法,但没有成功。我是这样做的:

Configuration cfg =
GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

这是IDE中显示的错误消息:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:211)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
... 19 more

Process finished with exit code 1

(Q2)在checkpoint的文档( https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html)中,写着“这样,如果你的工作失败,你将有一个检查点可以恢复。”,取消的工作怎么样?新作业会继续使用现有的检查点还是从它自己的检查点开始?

最佳答案

您可以控制在取消作业时是否删除外部化检查点。如果你想保留它们,你可以这样做:

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

欲了解更多信息,请参阅 the docs .

resume from an externalized checkpoint这样做(与从保存点恢复相同):
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]

关于apache-flink - Flink 外化检查点的两个问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49712817/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com