我正在尝试在kafkaconnect集群上设置一个新的elasticsearchsink作业。这个集群已经平稳地工作了几个月,通过sasl-ssl安全连接到kafka,通过https连接到主机a上的一个弹性示例。
kc集群通常在kubernetes中运行,但出于测试目的,我还使用docker(基于confluent的kc image v6.0.0的映像)在本地运行它,kafka驻留在测试环境中,作业使用rest调用启动。
用于在本地运行它的docker合成文件如下所示
version: '3.7'
services:
connect:
build:
dockerfile: Dockerfile.local
context: ./
container_name: kafka-connect
ports:
- "8083:8083"
environment:
KAFKA_OPTS: -Djava.security.krb5.conf=/<path-to>/secrets/krb5.conf
-Djava.security.auth.login.config=/<path-to>/rest-basicauth-jaas.conf
CONNECT_BOOTSTRAP_SERVERS: <KAFKA-INSTANCE-1>:2181,<KAFKA-INSTANCE-2>:2181,<KAFKA-INSTANCE-3>:2181
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_REST_EXTENSION_CLASSES: org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
CONNECT_GROUP_ID: <kc-group>
CONNECT_CONFIG_STORAGE_TOPIC: service-assurance.test.internal.connect.configs
CONNECT_OFFSET_STORAGE_TOPIC: service-assurance.test.internal.connect.offsets
CONNECT_STATUS_STORAGE_TOPIC: service-assurance.test.internal.connect.status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.IntegerConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: <KAFKA-INSTANCE-1>:2181,<KAFKA-INSTANCE-2>:2181,<KAFKA-INSTANCE-3>:2181
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_KERBEROS_SERVICE_NAME: "kafka"
CONNECT_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/<path-to>/kafka-connect.keytab" \
principal="<AD-USER>";
CONNECT_SASL_MECHANISM: GSSAPI
CONNECT_SSL_TRUSTSTORE_LOCATION: "/<path-to>/truststore.jks"
CONNECT_SSL_TRUSTSTORE_PASSWORD: <pwd>
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_KERBEROS_SERVICE_NAME: "kafka"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/<path-to>/kafka-connect.keytab" \
principal="<AD-USER>";
CONNECT_CONSUMER_SASL_MECHANISM: GSSAPI
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: "/<path-to>/truststore.jks"
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: <pwd>
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
具有类似的库伯内德配置。
连接器的启动方式如下:
curl -X POST -H "Content-Type: application/json" --data '{
"name": "connector-name",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": 2,
"batch.size": 200,
"max.buffered.records": 1500,
"flush.timeout.ms": 120000,
"topics": "topic.connector",
"auto.create.indices.at.start": false,
"key.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"schema.ignore": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"behavior.on.malformed.documents" : "ignore",
"behavior.on.null.values": "ignore",
"connection.url": "https://<elastic-host>",
"connection.username": "<user>",
"connection.password": "<pwd>",
"type.name": "_doc"
}
}' <host>/connectors/
现在,我的任务是安装另一个连接器,这次托管在主机b上。我遇到的问题是臭名昭著的:
sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
我已经修改了工作的信任库,以同时包含主机b的ca根证书。我相信信任库正在工作,因为我能够从java代码片段(实际上在atlassian页面sslpoke.class上找到)使用它成功地连接到a和b。
连接到主机a的连接器仍与新更新的信任库一起工作,但与主机b连接的连接器不工作。
我浏览了一下互联网,寻找如何解决这个问题的线索,发现了一些建议,需要明确补充:
"elastic.https.ssl.truststore.location": "/<pathto>/truststore.jks",
"elastic.https.ssl.truststore.password": "<pwd>",
连接到连接器配置。其他一些页面建议将信任库添加到kc配置kafka\u选项中,如下所示:
KAFKA_OPTS: -Djava.security.krb5.conf=/<path-to>/secrets/krb5.conf
-Djava.security.auth.login.config=/<path-to>/rest-basicauth-jaas.conf
-Djavax.net.ssl.trustStore=/<path-to>/truststore.jks
按照这些建议,我实际上可以让连接到主机b的连接器成功启动。但现在是涂油部分。添加额外的参数到Kafka,我的旧连接器连接到一个停止工作同样的错误!所以现在我有一个例子,要么连接器连接到a,要么连接器连接到b,但不能同时工作。
请,如果有人能给我一些关于如何解决这个问题的建议或想法,将不胜感激,因为这让我发疯。
暂无答案!
目前还没有任何答案,快来回答吧!