通过SpringKafka列出kafka主题

jhiyze9q  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(356)

我们希望通过spring kafka列出所有kafka主题,以获得类似于kafka命令的结果:

bin/kafka-topics.sh --list --zookeeper localhost:2181

在下面的服务中运行gettopics()方法时,我们得到org.apache.kafka.common.errors.timeoutexception:获取主题元数据时超时
配置:

@EnableKafka
@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
}

服务:

@Service
public class TopicServiceKafkaImpl implements TopicService {
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Override
    public Set<String> getTopics() {
        try (Consumer<String, String> consumer = 
            consumerFactory.createConsumer()) {
            Map<String, List<PartitionInfo>> map = consumer.listTopics();
            return map.keySet();
    }
}

Kafka正在运行,我们可以从我们的应用程序发送消息到一个主题成功。

zy1mlcev

zy1mlcev1#

kafka-topics --list 是一个shell脚本,它只是一个 Package 器 kafka.admin.TopicCommand 类,从中可以找到要查找的方法
或者,也可以使用 AdminClient#listTopics 方法

a2mppw5e

a2mppw5e2#

您正在连接zookeeper(2181)而不是kafka(默认情况下是9092)。
javakafka客户机不再直接与zk对话。

b4wnujal

b4wnujal3#

您可以使用管理客户端列出这样的主题

Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    AdminClient adminClient = AdminClient.create(properties);

    ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
    listTopicsOptions.listInternal(true);

    System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());

相关问题