flink async io不适用于emr群集上的elasticsearch

42fyovps  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(276)

我有一份flink的工作,它有以下操作序列
自定义源(kinesis)->异步等待操作符->过滤器->Map->接收器
在async wait操作符中,我查询elasticsearch并丰富从kinesis接收的事件。在asyncfunction中,我在open方法中获得一个transportclient,并在asyncinvoke方法中使用searchAPI。
此设置在本地群集上运行良好。但是当我尝试在emr上运行它时,作业被正确提交了。当一个事件来自动觉时,它不能用下面的错误丰富这个事件。错误的原因可能是什么。如何调试?

Caused by: NoNodeAvailableException[None of the configured nodes are available: []]
    at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:290)
    at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:207)
    at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55)
    at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:288)
    at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:359)
    at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:86)
    at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:56)
    at com.a.p.flink.AsyncESRequest.asyncInvoke(AsyncESRequest.java:54)
    at com.a.p.flink.AsyncESRequest.asyncInvoke(AsyncESRequest.java:23)
    at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:230)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:263)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:209)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

另一个问题是,在emr集群中运行作业时,在哪里可以看到正在运行的作业的日志?它们似乎没有被记录在主日志中。我正在使用每个作业群集。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题