关于这个主题有很多问题,但是,这不是一个重复的问题!
我面临的问题是,我试图用Java14和Kafka2.5.0建立一个springboot项目,而我的使用者返回一个空的记录列表。这里的大多数答案表示一些被遗忘的属性,要经常轮询或将偏移模式设置为最早。
尽管我的配置设置看起来很不常规(请参阅下面代码段中我对jaas.conf的设置),但我看不出与docs.confluent.io有任何逻辑上的区别。
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
return new KafkaConsumer<>(config);
}
}
不过,这是可行的。我没有收到任何例外(Kafka,或其他),和连接。
// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};
这里是我真正投票的地方:
try {
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
int count = 0;
Long start = System.currentTimeMillis();
Long end = System.currentTimeMillis();
while (end - start < 900_000) {
// boolean would be set to true in production
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
System.out.println("visualize number of loops made: " + ++count);
end = System.currentTimeMillis();
}
} catch (KafkaException e) {
e.printStackTrace();
} catch (Exception e) {
System.out.println(e.getMessage());
}
我添加了指纹和其他杂物,以试图找到问题。我在调试模式下运行程序,并将断点放在这一行:
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
结果,我看到一个打印行,每秒钟计数一次,正如人们所料。但由于我的消费者没有返回任何记录,因此它从未进入 forEach
这样就不会触发我的断点。
我肯定能在云中看到我的主题,有两个分区。消息是源源不断地生成的,所以我知道我应该能够找到一些东西。
我知道连接到集群需要一些时间,但是由于当前时间被设置为一刻钟,我至少应该收到一些东西,对吗?作为一种选择,我试着切换 consumer.subscribe()
到 consumer.assign()
方法,将使用者设置为 consumer.seekToBeginning()
. 它运行良好,但也没有返回任何东西。
另一件在最常见的例子中找不到的事情是,我使用自己的类。所以 KafkaConsumer<String, String>
,我根据本教程实现了自定义(反)序列化程序。
可能是我的配置设置吗?投票超时有问题吗?(反)序列化,还是完全其他的东西?我真的找不出任何理由来解释为什么我没有记录。任何反馈都将不胜感激!
1条答案
按热度按时间k97glaaz1#
问题解决了。从我发布的问题中你无法确定,不过,我想澄清一些事情,如果其他人发现自己被类似的配置困住了。
验证收到的密码是否正确。掌纹
就这样,我以为他在和集群建立连接,但我的循环一直在打印计数,因为
.poll(Duration.ofMillis(1000))
方法被执行->检查他是否能在给定的超时内连接->如果连接失败,则继续返回零记录。不会引发错误。正常情况下,2秒左右之后,就应该接通了。检查与数据库的连接。
你永远不希望应用程序停止,这就是我设计的原因
myOwnKafkaService.getSomethingFromRecord(record.key(), record.value())
方法来记录所有错误,但捕获所有异常。直到我检查了日志,我才意识到我访问远程数据库的权限不正常。时间戳应该反序列化为java.util.date
错误地分析它会抛出一个异常,但我的方法返回了一个异常
null
. 正如所有的评论在这个答案,这一个也归结为缺乏经验,在这样的设置。您将发现下面经过修正的类可以作为一个工作示例(但完全不是最佳实践)。Kafka小说:
轮询方法的主体:
带有反序列化程序的myclass的小示例:
我希望这能为将来的其他人服务。我从挫折和错误中学到了不少东西。