如何在kafka connect汇流平台的elasticsearch接收器连接器配置中使用ca证书?

0h4hbjxa  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(284)

我目前正在尝试在分布式模式下配置kafka connect群集上的elasticsearch接收器连接器。这个集群使用confluent提供的helm图部署在kubernetes中。这是json文件的属性(我使用username passwd连接到elastic,为了安全起见,将其从json文件中删除)

"name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "dropPrefix",
    "config.action.reload": "restart",
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topics": "_audit_log",
    "errors.deadletterqueue.topic.name": "_audit_log_dead_letter_queue",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": ".*",
    "transforms.dropPrefix.replacement": "audit_log",
    "connection.url": "https://path_to_elastic_cloud:9200",
    "auto.create.indices.at.start": "false",
    "type.name": "",
    "key.ignore": "true",
    "schema.ignore": "true",
    "drop.invalid.message": "true",
    "elastic.ca.cert.path": "/opt/xyz/elastic/certs/tls.crt",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}```

but here are the error logs from the connect cluster.

    [INFO] 2020-11-04 19:13:53,384 [task-thread-Brians-0] io.searchbox.client.AbstractJestClient setServers - Setting server pool to a list of 1 servers: [https://path_to_elastic_clound:9200]
[INFO] 2020-11-04 19:13:53,385 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getConnectionManager - Using multi thread/connection supporting pooling connection manager
[INFO] 2020-11-04 19:13:53,386 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getObject - Using default GSON instance
[INFO] 2020-11-04 19:13:53,386 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getObject - Node Discovery disabled...
[INFO] 2020-11-04 19:13:53,386 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getObject - Idle connection reaping enabled...
[INFO] 2020-11-04 19:13:53,387 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getObject - Authentication cache set for preemptive authentication
[ERROR] 2020-11-04 19:13:53,410 [task-thread-Brians-0] org.apache.kafka.connect.runtime.WorkerTask doRun - WorkerSinkTask{id=Brians-0} Task threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Couldn't start ElasticsearchSinkTask due to connection error:
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:168)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:152)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:74)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:48)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:326)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:269)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1339)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1214)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1157)
    at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
    at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:183)
    at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:171)
    at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1403)
    at java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1309)
    at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:440)
    at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:411)
    at org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:396)
    at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
    at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
    at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:133)
    at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:70)
    at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:63)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:316)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:161)
    ... 12 more
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
    at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
    at java.base/sun.security.validator.Validator.validate(Validator.java:264)
    at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:222)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1323)
    ... 39 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
    at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
    at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
    at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)```

any pointers are deeply appreciated. Thank you.
w80xi6nr

w80xi6nr1#

这个问题已经解决了。早些时候我们没有使用密钥库的任何密钥。在遵循github上这个票证的指示之后,终于能够将这些添加到配置(不是实际值)。很有帮助。
https://github.com/confluentinc/kafka-connect-elasticsearch/issues/432

"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "/mnt/secrets/elastic/keystore.jks",
"elastic.https.ssl.keystore.password": "changeit",
"elastic.https.ssl.truststore.location": "/mnt/secrets//elastic/truststore.jks",
"elastic.https.ssl.truststore.password": "changeit"

相关问题