java中如何从kafka服务器获取主题列表

g9icjywg  于 2021-06-07  发布在  Kafka
关注(0)|答案(6)|浏览(478)

我正在使用 kafka 0.8 版本和非常新的。
我想知道在中创建的主题列表 kafka server 以及它的元数据。有什么api可以找到这个吗?
基本上,我需要编写一个java消费者,它应该自动发现 kafka server 。有api要获取 TopicMetadata ,但这需要主题名称作为输入参数。我需要服务器中所有主题的信息。

cgyqldqp

cgyqldqp1#

如果你想从zookeeper获取经纪人或其他Kafka信息 kafka.utils.ZkUtils 提供了一个很好的界面。下面是我必须列出所有zookeeper代理的代码(还有很多其他方法):

List<Broker> listBrokers() {

        final ZkConnection zkConnection = new ZkConnection(connectionString);
        final int sessionTimeoutMs = 10 * 1000;
        final int connectionTimeoutMs = 20 * 1000;
        final ZkClient zkClient = new ZkClient(connectionString,
                                               sessionTimeoutMs,
                                               connectionTimeoutMs,
                                               ZKStringSerializer$.MODULE$);

        final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);

        scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
}
shyt4zoc

shyt4zoc2#

一个好的起点是kafka附带的shell脚本示例。在发行版的/bin目录中,可以使用一些shell脚本,其中之一是./kafka-topic-list.sh如果运行该脚本而不指定主题,它将返回所有主题及其元数据。请参见:https://github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh
shell脚本依次运行:https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/listtopiccommand.scala
以上都是对0.8kafka版本的引用,因此如果您使用的是不同的版本(甚至是点差异),请确保在github上使用适当的分支/标记

ldxq2e6h

ldxq2e6h3#

您可以使用zookeeper api获取代理列表,如下所述:

ZooKeeper zk = new ZooKeeper("zookeeperhost, 10000, null);
    List<String> ids = zk.getChildren("/brokers/ids", false);
    List<Map> brokerList = new ArrayList<>();
    ObjectMapper objectMapper = new ObjectMapper();

    for (String id : ids) {
        Map map = objectMapper.readValue(zk.getData("/brokers/ids/" + id, false, null), Map.class);
        brokerList.add(map);
    }

使用此代理列表可以使用以下链接获取所有主题
https://cwiki.apache.org/confluence/display/kafka/finding+topic+and+partition+leader

plupiseo

plupiseo4#

使用scala:

import java.util.{Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

object KafkaTest {
  def main(args: Array[String]): Unit = {

    val brokers = args(0)
    val props = new Properties();
    props.put("bootstrap.servers", brokers);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    val consumer = new KafkaConsumer[String, String](props);
    val topics = consumer.listTopics().keySet();

    println(topics)
  }
}
plicqrtu

plicqrtu5#

Kafka0.9.0
您可以使用提供的使用者方法listtopics()列出服务器中的主题;
如。

Map<String, List<PartitionInfo> > topics;

Properties props = new Properties();
props.put("bootstrap.servers", "1.2.3.4:9092");
props.put("group.id", "test-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
topics = consumer.listTopics();
consumer.close();
xqk2d5yq

xqk2d5yq6#

我认为这是最好的方法:

ZkClient zkClient = new ZkClient("zkHost:zkPort");
List<String> topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient));

相关问题