启动新的基于kafka connect ssl的连接器时出现问题

8yoxcaq7  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(300)

我正在尝试在kafkaconnect集群上设置一个新的elasticsearchsink作业。这个集群已经平稳地工作了几个月,通过sasl-ssl安全连接到kafka,通过https连接到主机a上的一个弹性示例。
kc集群通常在kubernetes中运行,但出于测试目的,我还使用docker(基于confluent的kc image v6.0.0的映像)在本地运行它,kafka驻留在测试环境中,作业使用rest调用启动。
用于在本地运行它的docker合成文件如下所示

version: '3.7'
services:
  connect:
    build:
      dockerfile: Dockerfile.local
      context: ./
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      KAFKA_OPTS: -Djava.security.krb5.conf=/<path-to>/secrets/krb5.conf 
                  -Djava.security.auth.login.config=/<path-to>/rest-basicauth-jaas.conf
      CONNECT_BOOTSTRAP_SERVERS: <KAFKA-INSTANCE-1>:2181,<KAFKA-INSTANCE-2>:2181,<KAFKA-INSTANCE-3>:2181
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_REST_EXTENSION_CLASSES: org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
      CONNECT_GROUP_ID: <kc-group>
      CONNECT_CONFIG_STORAGE_TOPIC: service-assurance.test.internal.connect.configs
      CONNECT_OFFSET_STORAGE_TOPIC: service-assurance.test.internal.connect.offsets
      CONNECT_STATUS_STORAGE_TOPIC: service-assurance.test.internal.connect.status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.IntegerConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: <KAFKA-INSTANCE-1>:2181,<KAFKA-INSTANCE-2>:2181,<KAFKA-INSTANCE-3>:2181
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_KERBEROS_SERVICE_NAME: "kafka"
      CONNECT_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
                                useKeyTab=true \
                                storeKey=true \
                                keyTab="/<path-to>/kafka-connect.keytab" \
                                principal="<AD-USER>";
      CONNECT_SASL_MECHANISM: GSSAPI
      CONNECT_SSL_TRUSTSTORE_LOCATION: "/<path-to>/truststore.jks"
      CONNECT_SSL_TRUSTSTORE_PASSWORD: <pwd>
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_KERBEROS_SERVICE_NAME: "kafka"
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
                                useKeyTab=true \
                                storeKey=true \
                                keyTab="/<path-to>/kafka-connect.keytab" \
                                principal="<AD-USER>";
      CONNECT_CONSUMER_SASL_MECHANISM: GSSAPI
      CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: "/<path-to>/truststore.jks"
      CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: <pwd>
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"

具有类似的库伯内德配置。
连接器的启动方式如下:

curl  -X POST -H "Content-Type: application/json" --data '{
  "name": "connector-name",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": 2,
    "batch.size": 200,
    "max.buffered.records": 1500,
    "flush.timeout.ms": 120000,
    "topics": "topic.connector",
    "auto.create.indices.at.start": false,
    "key.ignore": true,
    "value.converter.schemas.enable": false,
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "schema.ignore": true,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "behavior.on.malformed.documents" : "ignore",
    "behavior.on.null.values": "ignore",
    "connection.url": "https://<elastic-host>",
    "connection.username": "<user>",
    "connection.password": "<pwd>",
    "type.name": "_doc"
  }
}' <host>/connectors/

现在,我的任务是安装另一个连接器,这次托管在主机b上。我遇到的问题是臭名昭著的:

sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

我已经修改了工作的信任库,以同时包含主机b的ca根证书。我相信信任库正在工作,因为我能够从java代码片段(实际上在atlassian页面sslpoke.class上找到)使用它成功地连接到a和b。
连接到主机a的连接器仍与新更新的信任库一起工作,但与主机b连接的连接器不工作。
我浏览了一下互联网,寻找如何解决这个问题的线索,发现了一些建议,需要明确补充:

"elastic.https.ssl.truststore.location": "/<pathto>/truststore.jks",
"elastic.https.ssl.truststore.password": "<pwd>",

连接到连接器配置。其他一些页面建议将信任库添加到kc配置kafka\u选项中,如下所示:

KAFKA_OPTS: -Djava.security.krb5.conf=/<path-to>/secrets/krb5.conf 
              -Djava.security.auth.login.config=/<path-to>/rest-basicauth-jaas.conf
              -Djavax.net.ssl.trustStore=/<path-to>/truststore.jks

按照这些建议,我实际上可以让连接到主机b的连接器成功启动。但现在是涂油部分。添加额外的参数到Kafka,我的旧连接器连接到一个停止工作同样的错误!所以现在我有一个例子,要么连接器连接到a,要么连接器连接到b,但不能同时工作。
请,如果有人能给我一些关于如何解决这个问题的建议或想法,将不胜感激,因为这让我发疯。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题