flink with elasticsearch io.netty库的5个接收器冲突

xa9qqrwz  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(424)

我正在玩flink+elasticsearch 5接收器,使用x-pack身份验证。
我第一次遇到这个错误flink xpack elasticsearch 5 elasticsearchsecurityexception缺少身份验证
所以我修正了覆盖es sink函数的问题。
我现在的问题是,当我尝试在flink上运行作业(使用jar)时,我遇到了这个错误。

Caused by: java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
    at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
    at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
    at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
    at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
    at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
    at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
    at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
    at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
    at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
    at org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:408)
    at org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:354)
    at org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:195)
    at org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:312)
    at com.ceptinel.flink.sink.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:45)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
    at java.lang.Thread.run(Thread.java:745)

看起来flink和es客户机之间的库io.netty有冲突(不确定是x-pack-transport还是连接器本身)
有什么办法可以避免这种冲突吗?
谢谢,路易斯

x4shl7ld

x4shl7ld1#

我使用gradle而不是maven,但是过程差不多。
万一,如果你仍然有这个问题(这是非常不可能的),我已经尝试解决,解决方案似乎工作。
这是我的依赖项块:

dependencies {
    ....
    compile(group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: project.flinkStreamJavaVersion)
            {
                exclude group: 'io.netty'
            }
    compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: project.flinkKafkaConnectorVersion
    compile group: 'org.apache.flink', name: 'flink-connector-elasticsearch5_2.10', version: project.flinkElasticConnectorVersion

    ....
}

然后使用以下方法将io.netty着色到不同的包中:

shadowJar {
    ....
    relocate 'io.netty', 'shaded.io.netty'
    ....
}

注意:捕获是为了排除来自

flink-streaming-java_2.10

如果你试图遮荫不排除荨麻来自Flink流它不会改变什么。因此,排除flink流媒体库中的netty是非常重要的。

3wabscal

3wabscal2#

在apache flink中修复之前(通过隐藏netty依赖关系),我建议您将用户jar中的netty隐藏到不同的名称空间中。
如果您使用apachemaven构建项目,那么可以使用 maven-shade-plugin 去做吧。另请查看有关flink中着色的文档页:https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html#resolving-依赖关系与使用maven shade插件的flink冲突

相关问题