我正在玩flink+elasticsearch 5接收器,使用x-pack身份验证。
我第一次遇到这个错误flink xpack elasticsearch 5 elasticsearchsecurityexception缺少身份验证
所以我修正了覆盖es sink函数的问题。
我现在的问题是,当我尝试在flink上运行作业(使用jar)时,我遇到了这个错误。
Caused by: java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
at org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:408)
at org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:354)
at org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:195)
at org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:312)
at com.ceptinel.flink.sink.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:45)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
at java.lang.Thread.run(Thread.java:745)
看起来flink和es客户机之间的库io.netty有冲突(不确定是x-pack-transport还是连接器本身)
有什么办法可以避免这种冲突吗?
谢谢,路易斯
2条答案
按热度按时间x4shl7ld1#
我使用gradle而不是maven,但是过程差不多。
万一,如果你仍然有这个问题(这是非常不可能的),我已经尝试解决,解决方案似乎工作。
这是我的依赖项块:
然后使用以下方法将io.netty着色到不同的包中:
注意:捕获是为了排除来自
如果你试图遮荫不排除荨麻来自Flink流它不会改变什么。因此,排除flink流媒体库中的netty是非常重要的。
3wabscal2#
在apache flink中修复之前(通过隐藏netty依赖关系),我建议您将用户jar中的netty隐藏到不同的名称空间中。
如果您使用apachemaven构建项目,那么可以使用
maven-shade-plugin
去做吧。另请查看有关flink中着色的文档页:https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html#resolving-依赖关系与使用maven shade插件的flink冲突