我正在使用 Flink v.1.4.0
.
我正在尝试使用 DataSet API
通过 IntelliJ
. 请注意,如果通过 Flink UI
工作进展顺利。为了运行作业,我需要首先通过环境变量指定要处理的数据量。当数量相对较少时,作业运行良好。但随着它变大,我开始得到以下错误:
ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
31107 [main] ERROR com.company.someLib.SomeClass - Error executing pipeline
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:193)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.lambda$runPipeline$1(EmailAnalyserPipeline.java:120)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.runPipeline(EmailAnalyserPipeline.java:87)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.main(EmailAnalyserPipeline.java:65)
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
我可以看出建议是:
You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
但我怀疑问题还不止于此。但是为了到达那里,我需要首先配置 akka.client.timeout
. 如何在intellij中执行此操作?超时应该有多长时间?
此外,到底是什么导致了这种情况?我需要增加堆内存吗?谢谢。
2条答案
按热度按时间xmq68pz91#
您可以通过flink配置文件设置此属性。看到了吗https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#distributed-通过阿克卡协调
因此,在flink-conf.yaml中,您可以添加以下示例:
但看起来数据处理的地方不对。您是否可以在构造函数中而不是在
map
或者run
功能?idv4meu82#
我能想出来,也没那么难。我所要做的就是去
Run > Edit Configurations
在下面Configucation
选项卡中的Program arguments
字段中,添加以下内容:然而,我应该注意到,这并没有完全解决我所面临的问题。