我们希望通过spring kafka列出所有kafka主题,以获得类似于kafka命令的结果:
bin/kafka-topics.sh --list --zookeeper localhost:2181
在下面的服务中运行gettopics()方法时,我们得到org.apache.kafka.common.errors.timeoutexception:获取主题元数据时超时
配置:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
服务:
@Service
public class TopicServiceKafkaImpl implements TopicService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Override
public Set<String> getTopics() {
try (Consumer<String, String> consumer =
consumerFactory.createConsumer()) {
Map<String, List<PartitionInfo>> map = consumer.listTopics();
return map.keySet();
}
}
Kafka正在运行,我们可以从我们的应用程序发送消息到一个主题成功。
3条答案
按热度按时间zy1mlcev1#
kafka-topics --list
是一个shell脚本,它只是一个 Package 器kafka.admin.TopicCommand
类,从中可以找到要查找的方法或者,也可以使用
AdminClient#listTopics
方法a2mppw5e2#
您正在连接zookeeper(2181)而不是kafka(默认情况下是9092)。
javakafka客户机不再直接与zk对话。
b4wnujal3#
您可以使用管理客户端列出这样的主题