我已经将我的Kafka从版本0.10.2.0更新到版本ón 2.1.0,现在Kafka不能使用消息。我使用的是spring boot,下面是我的配置:
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "kafka2");
configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 3000000);
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 2000000000);
configProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000");
configProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "905000");
configProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "35000");
我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1.2.release。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的
你知道这个问题吗?
1条答案
按热度按时间y1aodyip1#
在谈论“spring版本”时,应该更具体一些。有许多不同版本的spring项目。
我想你的意思是SpringBoot是2.1.2(顺便说一下,当前的2.1.x版本是2.1.6)。
SpringBoot2.1.6将spring用于ApacheKafka2.2.7,后者反过来使用
kafka-clients
2.0.1.我认为使用2.0.xkafka客户机和2.1.0代理不会出现任何问题,但是您可以尝试将kafka客户机版本覆盖到2.1.0,如参考手册中所述。
同样,我建议为启用调试日志记录
apache.kafka
看看它是否能给你提供更多的信息。