gpt4 book ai didi

akka - 从代码中取消 Apache Flink 作业

转载 作者:行者123 更新时间:2023-12-01 01:53:46 26 4
gpt4 key购买 nike

我想从代码中停止/取消 flink 作业。这是在我的集成测试中,我向我的 flink 作业提交任务并检查结果。当作业异步运行时,即使测试失败/通过,它也不会停止。我想在测试结束后停止工作。

我尝试了一些我在下面列出的东西:

  • 获取jobmanager Actor
  • 获取正在运行的作业
  • 对于每个正在运行的作业,向作业管理器发送取消请求

  • 这当然没有运行,但我不确定 jobmanager actorref 是错误的还是缺少其他东西。

    我得到的错误是:[flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[ akka://flink/temp/$a] 到 Actor[akka://flink/user/jobmanager_1] 未交付。 [1] 遇到死信。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录

    这意味着要么作业管理器 actor ref 错误,要么发送给它的消息不正确。

    代码如下所示:
    val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
    val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
    val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
    val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
    if(result.isInstanceOf[RunningJobsStatus]){
    val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
    val itr = runningJobs.iterator()
    while(itr.hasNext){
    val jobId = itr.next().getJobId
    val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
    try {
    Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
    }
    catch {
    case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
    }

    }
    }
    }
    catch{
    case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

    }

    有人可以检查这是否是正确的方法吗?

    编辑 :
    要完全停止Job,需要按照先TaskManager 再JobManager 的顺序停止TaskManager 和JobManager。

    最佳答案

    您正在创建一个新的 ActorSystem然后试着找一个名字叫 /user/jobmanager_1 的 Actor 在同一个 Actor 系统中。这是行不通的,因为实际的作业管理器将在不同的 ActorSystem 中运行。 .

    如果您想获得 ActorRef对于真正的工作经理,您必须使用相同的 ActorSystem用于选择(然后您可以使用本地地址)或者您已找到作业管理器参与者的远程地址。远程地址的格式为 akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number] .如果您有权访问 FlinkMiniCluster那么你可以使用 leaderGateway promise 获得现任领导的ActorGateway .

    关于akka - 从代码中取消 Apache Flink 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42139932/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com