gpt4 book ai didi

apache-flink - Apache Flink JobListener 无法正常工作

转载 作者:行者123 更新时间:2023-12-05 07:02:11 26 4
gpt4 key购买 nike

我在flink 1.11.1写了一个flink batch job。作业成功完成后,我想做一些类似调用 http 服务的事情。

我添加了一个简单的作业监听器来 Hook 作业状态。问题是当 kafka 接收器运算符(operator)抛出错误时,不会触发作业监听器。我希望当我的工作失败时,它应该触发我的工作监听器并打印失败日志。

我如何确定工作是否成功完成?

我们将不胜感激。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}

override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}
})

env.createInput(input)
.filter(r => Option(r.token).getOrElse("").nonEmpty)
.addSink(kafkaProducer)

最佳答案

如果您尝试在集群上运行该作业,您可以使用您的作业 ID 在控制台中查看您的记录器消息和标准输出。请引用随附的屏幕截图,

如果您在本地集群上运行,则默认 url 可以是 http://localhost:8081。

同样,以下不是检查您的工作是否成功的正确方法。

if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}

enter image description here

关于apache-flink - Apache Flink JobListener 无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63618060/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com