我有一份flink的工作,它有以下操作序列
自定义源(kinesis)->异步等待操作符->过滤器->Map->接收器
在async wait操作符中,我查询elasticsearch并丰富从kinesis接收的事件。在asyncfunction中,我在open方法中获得一个transportclient,并在asyncinvoke方法中使用searchAPI。
此设置在本地群集上运行良好。但是当我尝试在emr上运行它时,作业被正确提交了。当一个事件来自动觉时,它不能用下面的错误丰富这个事件。错误的原因可能是什么。如何调试?
Caused by: NoNodeAvailableException[None of the configured nodes are available: []]
at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:290)
at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:207)
at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55)
at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:288)
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:359)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:86)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:56)
at com.a.p.flink.AsyncESRequest.asyncInvoke(AsyncESRequest.java:54)
at com.a.p.flink.AsyncESRequest.asyncInvoke(AsyncESRequest.java:23)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:230)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:263)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:209)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
另一个问题是,在emr集群中运行作业时,在哪里可以看到正在运行的作业的日志?它们似乎没有被记录在主日志中。我正在使用每个作业群集。
暂无答案!
目前还没有任何答案,快来回答吧!