我们希望dse接收器连接器连接到(dse-5.1.2)cassandra发行版(3.11.3.5112)。我们为kafka、kafka connect和cassandra提供了单独的linux设备。我们已将kafka-connect-dse-1.3.1.jar作为插件添加到kafka-connect,下面是我们为连接器注册的配置:
{
"name": "dse-sink-connector-for-orders",
"config": {"connector.class":"com.datastax.kafkaconnector.DseSinkConnector",
"topics":"action_log", "tasks.max":"1",
"ssl.provider":"JDK",
"ssl.truststore.path":"<<Path_To_trust_store>>",
"ssl.truststore.password":"<<password>>",
"auth.provider":"DSE", "auth.username":"<<username>>",
"auth.password":"<<password>>",
"contactPoints":"172.21.96.64",
"loadBalancing.localDc":"DC1", "port":"9042",
"topic.action_log.testc_netspend.dse_orders.consistencyLevel": "LOCAL_ONE",
"topic.action_log.testc_netspend.dse_orders.mapping":
"id=value.ACTION_ID,created=value.USER_ID, price=value.ACTION_ID, product=value.USER_ID, qty=value.ACTION_ID"
}
}
当我们击中https://172.21.96.64:8083/connectors/dse sink connector对于订单/状态,它显示连接器正在运行,但任务失败。我们在日志中发现以下错误:
[dse-sink-connector-for-orders|task-0] WorkerSinkTask{id=dse-sink-connector-for-orders-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 3 nodes, use getAllErrors() for more): Node(endPoint=test-db-cass0a02dc1/172.21.96.64:9042, hostId=null, hashCode=5fa99a2e): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|id: 0xc4c37855, L:/172.21.117.41:52221 - R:test-db-cass0a02dc1/172.21.96.64:9042] Protocol initialization request, step 1 (OPTIONS): failed to send request (javax.net.ssl.SSLException: SSLEngine closed already)]
at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:141)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:612)
at com.datastax.kafkaconnector.state.LifeCycleManager.buildCqlSession(LifeCycleManager.java:521)
at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:109)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at com.datastax.kafkaconnector.state.LifeCycleManager.startTask(LifeCycleManager.java:105)
at com.datastax.kafkaconnector.DseSinkTask.start(DseSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|id: 0xc4c37855, L:/172.21.117.41:52221 - R:test-db-cass0a02dc1/172.21.96.64:9042] Protocol initialization request, step 1 (OPTIONS): failed to send request (javax.net.ssl.SSLException: SSLEngine closed already)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.fail(ProtocolInitHandler.java:342)
at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.writeListener(ChannelHandlerRequest.java:87)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
at io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
at io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:57)
at io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
at io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
at io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:57)
at io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:835)
at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797)
at io.netty.handler.ssl.SslHandler.handleUnwrapThrowable(SslHandler.java:1254)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1230)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Caused by: javax.net.ssl.SSLException: SSLEngine closed already
at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:834)
... 23 more
尝试为ssl日志启用调试模式,发现以下错误:
s0-io-0, fatal error: 46: General SSLEngine problem
java.security.cert.CertificateException: No name matching test-db-cass0a01dc1 found
%% Invalidated: [Session-60, TLS_RSA_WITH_AES_256_CBC_SHA]
s0-io-0, SEND TLSv1.2 ALERT: fatal, description = certificate_unknown
s0-io-0, WRITE: TLSv1.2 Alert, length = 2
s0-io-0, fatal: engine already closed. Rethrowing javax.net.ssl.SSLHandshakeException: General SSLEngine problem
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
s0-io-0, called closeInbound()
s0-io-0, fatal: engine already closed. Rethrowing javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack?
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
s0-io-0, called closeOutbound()
s0-io-0, closeOutboundInternal()
Using SSLEngineImpl.
Allow unsafe renegotiation: false
Allow legacy hello messages: true
Is initial handshake: true
Is secure renegotiation: false
test-db-cass0a01dc1是ip为172.21.96.64的cassandra服务器的主机名
1条答案
按热度按时间a0zr77ik1#
您可以添加到
config
以下参数部分禁用主机名验证:有关传递java驱动程序属性和java驱动程序配置参考,请参阅kafka连接器的文档。