使用spring boot和apache camel连接kafka集群时发生ssl异常

41zrol4v  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(558)

我的要求是通过ssl与spring boot和apache camel连接kafka主题,我已经编写了下面的代码,但是遇到了一个错误,如sun.security.validator.validatorexception:pkix路径生成失败:sun.security.provider.certpath.suncertpathbuilderexception:找不到请求的目标的有效证书路径
任何人,请帮助我如何解决这个错误。

//in this code i'm configured the SSL
    @Configuration
    public class Testing {
        @Bean
         SSLContextParameters sslContextParameters(){
            KeyStoreParameters store = new KeyStoreParameters();
            store.setResource("kafka.client.truststore.jks");
            store.setPassword("123456");

            TrustManagersParameters trust = new TrustManagersParameters();
            trust.setKeyStore(store);

            SSLContextParameters parameters = new SSLContextParameters();
            parameters.setTrustManagers(trust);

            return parameters;
          }

    }

在下面的文件中,我用sslcontextparameters调用路由器

@Autowired
    SSLContextParameters params;
@Override
    public void configure() throws Exception {
    from("{{timerOnce}}").process(consumerCreate).to(
                "https://xx.xx.xx.xxx/consumers/group-id?sslContextParameters=params");

}

******我使用了另一种方法通过ssl连接kafka群集,但很不幸遇到了这样的异常******org.apache.camel.spring.boot.camelspringbootinitializationexception:java.io.ioexception:无效的密钥库格式

下面的代码,我启用了ssl

public Endpoint setupSSLConext(CamelContext camelContext) throws Exception {

        KeyStoreParameters keyStoreParameters = new KeyStoreParameters();
        // Change this path to point to your truststore/keystore as jks files
        keyStoreParameters.setResource("kafka.client.truststore.jks");
        keyStoreParameters.setPassword("123456");

        KeyManagersParameters keyManagersParameters = new KeyManagersParameters();
        keyManagersParameters.setKeyStore(keyStoreParameters);
        keyManagersParameters.setKeyPassword("123456");

        TrustManagersParameters trustManagersParameters = new TrustManagersParameters();
        trustManagersParameters.setKeyStore(keyStoreParameters);

        SSLContextParameters sslContextParameters = new SSLContextParameters();
        sslContextParameters.setKeyManagers(keyManagersParameters);
        sslContextParameters.setTrustManagers(trustManagersParameters);

        HttpComponent httpComponent = camelContext.getComponent("https4", HttpComponent.class);
        httpComponent.setSslContextParameters(sslContextParameters);

        // This is important to make your cert skip CN/Hostname checks
        httpComponent.setX509HostnameVerifier(new X509HostnameVerifier() {
            @Override
            public void verify(String s, SSLSocket sslSocket) throws IOException {

            }

            @Override
            public void verify(String s, X509Certificate x509Certificate) throws SSLException {

            }

            @Override
            public void verify(String s, String[] strings, String[] strings1) throws SSLException {

            }

            @Override
            public boolean verify(String s, SSLSession sslSession) {
                // I don't mind just return true for all or you can add your own logic
                return true;
            }

        });

        return     httpComponent.createEndpoint("https://XX.XX.X.XXX/consumers/");
    }

下面是我使用的路由器的代码

public void configure() throws Exception {

        Endpoint createEndpoint = cdcHelper.setupSSLConext(context);

        from("{{timerOnce}}").process(consumerCreate)
                .to(createEndpoint);    // calling kafka consumer 

    }
}
b4wnujal

b4wnujal1#

您可以按照下面的方法使用apachecamel和springboot来设置kafka消费者。
将以下属性添加到application.properties


# kafka configuration

kafka.topic=iot1
kafka.camelKafkaOptions.groupId=grp1
kafka.camelKafkaOptions.brokers=kafka.localtest:9093
kafka.camelKafkaOptions.consumersCount=10
kafka.camelKafkaOptions.autoOffsetReset=latest
kafka.camelKafkaOptions.autoCommitEnable=false
kafka.camelKafkaOptions.allowManualCommit=true
kafka.camelKafkaOptions.metadataMaxAgeMs=5000
kafka.camelKafkaOptions.securityProtocol=SSL
kafka.camelKafkaOptions.sslEndpointAlgorithm=HTTPS
kafka.camelKafkaOptions..sslKeyPassword=<ssl key password>
kafka.camelKafkaOptions..sslKeystoreLocation=<keystorepath>
kafka.camelKafkaOptions.sslKeystorePassword=<sslkeystore password>
kafka.camelKafkaOptions.sslTruststoreLocation=<truststore path>
kafka.camelKafkaOptions.sslTruststorePassword=<password>

and create a utility method, to construct a kafka url

@Component
public class KafkaUtility {
      public String getKafkaEndpoint(String topicName ){
       StringBuilder urlBuilder = new StringBuilder("kafka:" + topicName);

        if (!getCamelKafkaOptions().isEmpty()) {
            urlBuilder.append("&");
            getCamelKafkaOptions().forEach(
                (key, value) -> {
                    if (StringUtils.isNotBlank(value)) {
                        appendConfig(urlBuilder, key, value);
                    }
                }
            );
        }
        // strip the last "&" symbol
         String kafkaURL = stripLastAnd(urlBuilder.toString());
        return kafkaURL;
   }
}

In your route builder, implement the below

    @Autowired
    private KafkaUtility kafkaUtility;

  from(kafkaUtility.getKafkaEndpoint())
  .process("yourprocessor")
  .to("tourl");

相关问题