datastax接收器连接器(kafka connect dse)抛出javax.net.ssl.sslexception:sslengine已关闭

d5vmydt9  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(358)

我们希望dse接收器连接器连接到(dse-5.1.2)cassandra发行版(3.11.3.5112)。我们为kafka、kafka connect和cassandra提供了单独的linux设备。我们已将kafka-connect-dse-1.3.1.jar作为插件添加到kafka-connect,下面是我们为连接器注册的配置:

{
"name": "dse-sink-connector-for-orders",
"config":   {"connector.class":"com.datastax.kafkaconnector.DseSinkConnector", 
"topics":"action_log", "tasks.max":"1", 
"ssl.provider":"JDK",  
"ssl.truststore.path":"<<Path_To_trust_store>>", 
"ssl.truststore.password":"<<password>>", 
"auth.provider":"DSE", "auth.username":"<<username>>", 
"auth.password":"<<password>>",  
"contactPoints":"172.21.96.64", 
"loadBalancing.localDc":"DC1", "port":"9042", 
"topic.action_log.testc_netspend.dse_orders.consistencyLevel": "LOCAL_ONE", 
"topic.action_log.testc_netspend.dse_orders.mapping":
"id=value.ACTION_ID,created=value.USER_ID, price=value.ACTION_ID, product=value.USER_ID, qty=value.ACTION_ID" 
} 
}

当我们击中https://172.21.96.64:8083/connectors/dse sink connector对于订单/状态,它显示连接器正在运行,但任务失败。我们在日志中发现以下错误:

[dse-sink-connector-for-orders|task-0] WorkerSinkTask{id=dse-sink-connector-for-orders-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 3 nodes, use getAllErrors() for more): Node(endPoint=test-db-cass0a02dc1/172.21.96.64:9042, hostId=null, hashCode=5fa99a2e): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|id: 0xc4c37855, L:/172.21.117.41:52221 - R:test-db-cass0a02dc1/172.21.96.64:9042] Protocol initialization request, step 1 (OPTIONS): failed to send request (javax.net.ssl.SSLException: SSLEngine closed already)]
    at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:141)
    at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
    at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:612)
    at com.datastax.kafkaconnector.state.LifeCycleManager.buildCqlSession(LifeCycleManager.java:521)
    at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:109)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at com.datastax.kafkaconnector.state.LifeCycleManager.startTask(LifeCycleManager.java:105)
    at com.datastax.kafkaconnector.DseSinkTask.start(DseSinkTask.java:74)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|id: 0xc4c37855, L:/172.21.117.41:52221 - R:test-db-cass0a02dc1/172.21.96.64:9042] Protocol initialization request, step 1 (OPTIONS): failed to send request (javax.net.ssl.SSLException: SSLEngine closed already)
        at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.fail(ProtocolInitHandler.java:342)
        at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.writeListener(ChannelHandlerRequest.java:87)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
        at io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
        at io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:57)
        at io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
        at io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
        at io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:57)
        at io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
        at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:835)
        at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797)
        at io.netty.handler.ssl.SslHandler.handleUnwrapThrowable(SslHandler.java:1254)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1230)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
    Caused by: javax.net.ssl.SSLException: SSLEngine closed already
        at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:834)
        ... 23 more

尝试为ssl日志启用调试模式,发现以下错误:

s0-io-0, fatal error: 46: General SSLEngine problem
java.security.cert.CertificateException: No name matching test-db-cass0a01dc1 found
%% Invalidated:  [Session-60, TLS_RSA_WITH_AES_256_CBC_SHA]
s0-io-0, SEND TLSv1.2 ALERT:  fatal, description = certificate_unknown
s0-io-0, WRITE: TLSv1.2 Alert, length = 2
s0-io-0, fatal: engine already closed.  Rethrowing javax.net.ssl.SSLHandshakeException: General SSLEngine problem
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
s0-io-0, called closeInbound()
s0-io-0, fatal: engine already closed.  Rethrowing javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack?
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
Using SSLEngineImpl.
Allow unsafe renegotiation: false
Allow legacy hello messages: true
Is initial handshake: true
Is secure renegotiation: false

test-db-cass0a01dc1是ip为172.21.96.64的cassandra服务器的主机名

a0zr77ik

a0zr77ik1#

您可以添加到 config 以下参数部分禁用主机名验证:

"datastax-java-driver.advanced.ssl-engine-factory.hostname-validation":"false"

有关传递java驱动程序属性和java驱动程序配置参考,请参阅kafka连接器的文档。

相关问题