Flink失去了领袖和崩溃

6l7fqoea  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(296)

我正在localstreamenvironment(嵌入式flink集群)中运行流处理应用程序。我使用代码成功地处理了一个特定的数据集几次。在对处理逻辑做了一些修改之后,我昨天想重新运行这个应用程序,但是经过大约3/4的数据处理之后,flink集群似乎无缘无故崩溃了。查看简明日志-我的评论插入尖括号中<>:

2018-02-09 12:04:05,146 [INFO] from a.b.l.f.MultiS3FileSource in Source: General source (1/1) - inserting 266574 events
2018-02-09 12:10:55,094 [ERROR] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11020 - class org.apache.flink.runtime.client.JobSubmissionClientActor received unknown message: 
2018-02-09 12:10:55,245 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Process -> Detection(7/8) switched to CANCELED ) because there is currently no valid leader id known.
2018-02-09 12:10:55,268 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Enrichment-> Flat Map(7/8) switched to CANCELED ) because there is currently no valid leader id known.
... <similar messages for all the processing steps>
2018-02-09 12:10:55,509 [ERROR] from o.a.f.s.r.t.StreamTask in PartialAggregations-> Sink: CassandraSink (1/8) - Error during disposal of stream operator.
java.lang.InterruptedException: null <because its interrupting a future>
... <for all of my sinks - these are custom, not the flink cassandra connectors>

第一条信息是关于我的源代码从s3读取数据并将其收集到flink中。
之后,第一个错误产生于:https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/client/jobsubmissionclientactor.java#l137
警告产生于:https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/akka/flinkuntypedactor.java#l115
最后一个错误在我的代码中,但是它是由flink试图拆掉作业引起的,所以它不应该是错误的原始原因。
我可以提供一些额外的信息,但我不知道什么是相关的。
第一个错误似乎是导致整个崩溃的原因。jobsubmissionclientactor怎么可能有空的getleadersessionid?如果flink正在运行embedded,那么jobsubmissionclientactor需要什么样的消息?在我看来,它能接收到的所有消息都是关于提交作业的消息。在嵌入式模式下也可以吗?我怎样才能防止这次撞车?
更新:我想我误解了错误日志。当我再次执行死刑的时候,我发现事情的顺序有点不同。在上一次运行中,我只在处理流时遇到错误,没有明显的原因导致流结束,因为最后一个错误似乎没有包含在我的日志文件中(尽管它被打印到了stdout)。此错误如下所示,以前的错误与上一次运行中的错误类似(关于流处理的错误)。

[error] Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: JobClientActor seems to have died before the JobExecutionResult could be retrieved.[error]         at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:285)
[error]         at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
[error]         at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
[error]         at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
[error]         at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:108)
[error]         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
[error]         at a.b.l.flink.FlinkIngestPrototype$.run(FlinkIngestPrototype.scala:90)
[error]         at a.b.l.flink.FlinkIngestPrototype$.main(FlinkIngestPrototype.scala:43)
[error]         at a.b.l.flink.FlinkIngestPrototype.main(FlinkIngestPrototype.scala)
[error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
[error]         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
[error]         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
[error]         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
[error]         at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[error]         at scala.concurrent.Await$.result(package.scala:190)
[error]         at scala.concurrent.Await.result(package.scala)
[error]         at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:273)
[error]         ... 9 more

我发现执行失败的原因如下:
jobclient对象ping job client actor是否已经完成,如果没有,那么它只ping他是否还活着。活动ping是:https://github.com/apache/flink/blob/62a777bc8ddfb4e34d7beaf7091a90b0bcc70c51/flink-runtime/src/main/java/org/apache/flink/runtime/client/jobclient.java#l273
此ping超时并向作业执行者发送毒丸,这将导致所有不同的处理错误。
我以前遇到过一些期货问题,它们会以不确定的方式被较短的超时时间打断。我对这个问题进行了一些调试,我认为这是由于一些很长的gc暂停(或类似的事情)。如何将超时与gc暂停同步的图示:https://imgur.com/a/9mmvn. 我想这也可能是暂停的原因。这是我的gc配置:

"-XX:-UseParallelGC",
"-XX:-UseConcMarkSweepGC",
"-XX:+UseG1GC",

根据大多数消息来源,这将导致非常短的gc暂停(不到一秒钟)。有人有过在flink中长时间gc暂停的经验吗?这可能是一个与硬件有关的问题吗?我正在ec2aws示例上运行应用程序。

j8ag8udp

j8ag8udp1#

正如你所说,这是gc暂停的问题,我试图解决这类问题的方法是:
减少作业内存需求
增加系统可用内存
增加心跳超时,这样长时间暂停后不会崩溃

相关问题