我正在尝试使用ssl连接到kafka 3.0,但在加载ssl密钥库时遇到问题
我尝试了许多可能的价值观,但没有帮助
我试过改变地点,改变地点的价值,但还是没用
package uk.co.argos.services.pas.StepDefinations;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONException;
import java.io.IOException;
import java.math.BigInteger;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class Test {
public static List<String> test1 = new ArrayList<>();
public static List<String> test2 = new ArrayList<>();
public static String BootStrapServers = "kafka-apps2-1.eu-west-1.dev.deveng.systems:9093,kafka-apps2-2.eu-west-1.dev.deveng.systems:9093,kafka-apps2-3.eu-west-1.dev.deveng.systems:9093";
public static String iODErrorTopicName = "argos-dev-carrier-preadvice-updates-v1";
public static Consumer<Long, String> createConsumer(String BOOTSTRAPSERVERS, String Topic) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAPSERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put("security.protocol","SSL");
props.put("ssl.protocol","SSL");
props.put("ssl.truststore.location","/kafka.truststore.jks");
props.put("ssl.truststore.password","changeit");
props.put("ssl.keystore.location","/kafka.keystore.jks");
props.put("ssl.keystore.type","JKS");
props.put("ssl.keystore.password","scdt@best");
props.put("ssl.key.password","scdtisbest");
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(Topic));
return consumer;
}
public static void ReadMessageinKafka_iODErrorTopic(String OrderNo) throws ExecutionException, InterruptedException {
final Consumer<Long, String> consumer = createConsumer(BootStrapServers, iODErrorTopicName);
final int giveUp = 25; int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.out.println("Consumer Record:"+record.value());
if(record.value().contains(OrderNo)){
String inValidRecord=record.value();
System.out.println("\nFOUND THE MESSAGE");
assertNotNull(inValidRecord);
}
else{
System.out.println("\nMessage didnt found in Kafka");
assertEquals("2","3");
}
});
consumer.commitAsync();
}
consumer.close();
System.out.println("Found the Invalid Message in Kafka - iOD Error Topic");
}
public static void main(String[] args) throws ParseException, IOException, JSONException, ExecutionException, InterruptedException {
ReadMessageinKafka_iODErrorTopic("AD106393581");
}
}
面临的错误:
11:33:58.649 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka-apps2-1.eu-west-1.dev.deveng.systems:9093, kafka-apps2-2.eu-west-1.dev.deveng.systems:9093, kafka-apps2-3.eu-west-1.dev.deveng.systems:9093]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = KafkaExampleConsumer
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = /kafka.keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = SSL
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /kafka.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
11:33:58.668 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=KafkaExampleConsumer] Initializing the Kafka consumer
11:33:59.046 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [kafka-apps2-1.eu-west-1.dev.deveng.systems:9093 (id: -1 rack: null), kafka-apps2-3.eu-west-1.dev.deveng.systems:9093 (id: -3 rack: null), kafka-apps2-2.eu-west-1.dev.deveng.systems:9093 (id: -2 rack: null)], partitions = [], controller = null)
11:34:00.990 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=KafkaExampleConsumer] Kafka consumer has been closed
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
at uk.co.argos.services.pas.StepDefinations.Test.createConsumer(Test.java:63)
at uk.co.argos.services.pas.StepDefinations.Test.ReadMessageinKafka_iODErrorTopic(Test.java:71)
at uk.co.argos.services.pas.StepDefinations.Test.main(Test.java:103)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /kafka.keystore.jks of type JKS
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:64)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
... 5 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:137)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:62)
... 9 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /kafka.keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:330)
at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:218)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:135)
... 10 more
Caused by: java.io.FileNotFoundException: \kafka.keystore.jks (The system cannot find the file specified)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at java.io.FileInputStream.<init>(FileInputStream.java:93)
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:323)
... 12 more
Process finished with exit code 1
有人能帮忙吗?密钥库有什么问题吗?正如它所说的“未能加载jks类型的ssl keystore/kafka.keystore.jks”
2条答案
按热度按时间doinxwow1#
错误似乎很明显。。。
原因:java.io.filenotfoundexception:\kafka.keystore.jks(系统找不到指定的文件)
ippsafx72#
我也有同样的问题,不幸的是kafka无法从类路径中读取密钥库如果它是war或jar中的资源,我解决了读取资源并将其存储到临时文件中的问题,该文件将绝对路径传递给kafka配置。