我的要求是通过ssl与spring boot和apache camel连接kafka主题,我已经编写了下面的代码,但是遇到了一个错误,如sun.security.validator.validatorexception:pkix路径生成失败:sun.security.provider.certpath.suncertpathbuilderexception:找不到请求的目标的有效证书路径
任何人,请帮助我如何解决这个错误。
//in this code i'm configured the SSL
@Configuration
public class Testing {
@Bean
SSLContextParameters sslContextParameters(){
KeyStoreParameters store = new KeyStoreParameters();
store.setResource("kafka.client.truststore.jks");
store.setPassword("123456");
TrustManagersParameters trust = new TrustManagersParameters();
trust.setKeyStore(store);
SSLContextParameters parameters = new SSLContextParameters();
parameters.setTrustManagers(trust);
return parameters;
}
}
在下面的文件中,我用sslcontextparameters调用路由器
@Autowired
SSLContextParameters params;
@Override
public void configure() throws Exception {
from("{{timerOnce}}").process(consumerCreate).to(
"https://xx.xx.xx.xxx/consumers/group-id?sslContextParameters=params");
}
******我使用了另一种方法通过ssl连接kafka群集,但很不幸遇到了这样的异常******org.apache.camel.spring.boot.camelspringbootinitializationexception:java.io.ioexception:无效的密钥库格式
下面的代码,我启用了ssl
public Endpoint setupSSLConext(CamelContext camelContext) throws Exception {
KeyStoreParameters keyStoreParameters = new KeyStoreParameters();
// Change this path to point to your truststore/keystore as jks files
keyStoreParameters.setResource("kafka.client.truststore.jks");
keyStoreParameters.setPassword("123456");
KeyManagersParameters keyManagersParameters = new KeyManagersParameters();
keyManagersParameters.setKeyStore(keyStoreParameters);
keyManagersParameters.setKeyPassword("123456");
TrustManagersParameters trustManagersParameters = new TrustManagersParameters();
trustManagersParameters.setKeyStore(keyStoreParameters);
SSLContextParameters sslContextParameters = new SSLContextParameters();
sslContextParameters.setKeyManagers(keyManagersParameters);
sslContextParameters.setTrustManagers(trustManagersParameters);
HttpComponent httpComponent = camelContext.getComponent("https4", HttpComponent.class);
httpComponent.setSslContextParameters(sslContextParameters);
// This is important to make your cert skip CN/Hostname checks
httpComponent.setX509HostnameVerifier(new X509HostnameVerifier() {
@Override
public void verify(String s, SSLSocket sslSocket) throws IOException {
}
@Override
public void verify(String s, X509Certificate x509Certificate) throws SSLException {
}
@Override
public void verify(String s, String[] strings, String[] strings1) throws SSLException {
}
@Override
public boolean verify(String s, SSLSession sslSession) {
// I don't mind just return true for all or you can add your own logic
return true;
}
});
return httpComponent.createEndpoint("https://XX.XX.X.XXX/consumers/");
}
下面是我使用的路由器的代码
public void configure() throws Exception {
Endpoint createEndpoint = cdcHelper.setupSSLConext(context);
from("{{timerOnce}}").process(consumerCreate)
.to(createEndpoint); // calling kafka consumer
}
}
1条答案
按热度按时间b4wnujal1#
您可以按照下面的方法使用apachecamel和springboot来设置kafka消费者。
将以下属性添加到application.properties