gpt4 book ai didi

java - Flink 失去领导者并崩溃

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:48:08 24 4
gpt4 key购买 nike

我正在 LocalStreamEnvironment(嵌入式 flink 集群)中运行流处理应用程序。我使用我的代码成功处理了几次特定数据集。昨天我想在对处理逻辑进行一些修改后重新运行应用程序,但是在数据处理大约 3/4 之后,似乎 flink 集群无故崩溃了。查看压缩日志 - 我的评论插入尖括号 <>:

2018-02-09 12:04:05,146 [INFO] from a.b.l.f.MultiS3FileSource in Source: General source (1/1) - inserting 266574 events
2018-02-09 12:10:55,094 [ERROR] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11020 - class org.apache.flink.runtime.client.JobSubmissionClientActor received unknown message:
2018-02-09 12:10:55,245 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Process -> Detection(7/8) switched to CANCELED ) because there is currently no valid leader id known.
2018-02-09 12:10:55,268 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Enrichment-> Flat Map(7/8) switched to CANCELED ) because there is currently no valid leader id known.
... <similar messages for all the processing steps>
2018-02-09 12:10:55,509 [ERROR] from o.a.f.s.r.t.StreamTask in PartialAggregations-> Sink: CassandraSink (1/8) - Error during disposal of stream operator.
java.lang.InterruptedException: null <because its interrupting a future>
... <for all of my sinks - these are custom, not the flink cassandra connectors>

第一条信息消息是关于我的来源从 s3 读取数据并将其收集到 flink 中。

之后第一个错误产生于:https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java#L137

并且警告由:https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java#L115 产生

最后一个错误是在我的代码中,但它是由 flink 试图拆除作业引起的,因此它不应该是错误的原始原因。

我可以提供一些额外的信息,但我不确定哪些是相关的。

第一个错误似乎是导致整个崩溃的原因。 JobSubmissionClientActor 怎么可能有一个空的 getLeaderSessionID?如果 flink 以嵌入式方式运行,JobSubmissionClientActor 期望什么样的消息?在我看来,它能够接收的所有消息都是关于提交作业的消息。在嵌入式模式下甚至可以吗?我怎样才能防止这种崩溃?

更新:我想我误解了错误日志。当我再次运行执行时,我得到的事件顺序略有不同。在之前的运行中,我只在处理流时遇到错误,没有明显的流结束原因,因为最后一个错误似乎没有包含在我的日志文件中(虽然它被打印到标准输出)。这个错误在下面,之前的错误与之前运行中的错误类似(围绕处理流的错误)。

[error] Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: JobClientActor seems to have died before the JobExecutionResult could be retrieved.[error]         at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:285)
[error] at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
[error] at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
[error] at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
[error] at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:108)
[error] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
[error] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
[error] at a.b.l.flink.FlinkIngestPrototype$.run(FlinkIngestPrototype.scala:90)
[error] at a.b.l.flink.FlinkIngestPrototype$.main(FlinkIngestPrototype.scala:43)
[error] at a.b.l.flink.FlinkIngestPrototype.main(FlinkIngestPrototype.scala)
[error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
[error] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
[error] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
[error] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
[error] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[error] at scala.concurrent.Await$.result(package.scala:190)
[error] at scala.concurrent.Await.result(package.scala)
[error] at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:273)
[error] ... 9 more

我已将执行失败追踪到以下原因:

  1. JobClient 对象会向作业客户 actor 发出 ping 信号,看它是否已经完成,如果还没有完成,那么如果他还活着,它就会简单地 ping 他。活着的 ping 是:https://github.com/apache/flink/blob/62a777bc8ddfb4e34d7beaf7091a90b0bcc70c51/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java#L273

  2. 此 ping 超时并向作业执行者发送毒丸,这会导致所有不同的处置错误。

我之前遇到过一些关于 futures 的问题,它们会以不确定的方式被较短的超时中断。我已经对这个问题进行了一些调试,我认为这是因为一些非常长的 GC 暂停(或类似的东西)。超时如何与 GC 暂停同步的图示:https://imgur.com/a/9mMvN .我认为这也可能是超时的原因。这是我的 GC 配置:

"-XX:-UseParallelGC",
"-XX:-UseConcMarkSweepGC",
"-XX:+UseG1GC",

根据大多数消息来源,这应该会导致非常短的 GC 暂停(不到一秒)。有人有过在 flink 中获得很长的 GC 暂停的经验吗?这可能是一个与硬件有关的问题吗?我在 EC2 AWS 实例上运行该应用程序。

最佳答案

正如您所说,这是 GC 暂停的问题,我尝试解决此类问题的方法是:

  1. 减少作业内存需求
  2. 增加系统可用内存
  3. 增加心跳超时,所以它不会在长时间暂停后崩溃

关于java - Flink 失去领导者并崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48707003/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com