flink作业执行异常:akka.client.timeout

vxbzzdmp  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(748)

我正在使用 Flink v.1.4.0 .
我正在尝试使用 DataSet API 通过 IntelliJ . 请注意,如果通过 Flink UI 工作进展顺利。为了运行作业,我需要首先通过环境变量指定要处理的数据量。当数量相对较少时,作业运行良好。但随着它变大,我开始得到以下错误:

ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
31107 [main] ERROR com.company.someLib.SomeClass - Error executing pipeline
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:193)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.lambda$runPipeline$1(EmailAnalyserPipeline.java:120)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.runPipeline(EmailAnalyserPipeline.java:87)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.main(EmailAnalyserPipeline.java:65)
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.

我可以看出建议是:

You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.

但我怀疑问题还不止于此。但是为了到达那里,我需要首先配置 akka.client.timeout . 如何在intellij中执行此操作?超时应该有多长时间?
此外,到底是什么导致了这种情况?我需要增加堆内存吗?谢谢。

xmq68pz9

xmq68pz91#

您可以通过flink配置文件设置此属性。看到了吗https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#distributed-通过阿克卡协调
因此,在flink-conf.yaml中,您可以添加以下示例:

akka.client.timeout: 10min

但看起来数据处理的地方不对。您是否可以在构造函数中而不是在 map 或者 run 功能?

idv4meu8

idv4meu82#

我能想出来,也没那么难。我所要做的就是去 Run > Edit Configurations 在下面 Configucation 选项卡中的 Program arguments 字段中,添加以下内容:

-Dakka.client.timeout:600s
-Dakka.ask.timeout:600s

然而,我应该注意到,这并没有完全解决我所面临的问题。

相关问题