我是confluent/kafka的新手,我想从kafka中找到元数据信息我想知道生产商名单主题列表主题的架构信息融合版本是5.0什么类(方法)可以提供这些信息?有相同的RESTAPI吗同时,zookeeper连接也是获取此信息所必需的。
cnh2zyt31#
1) 我不认为kafka代理知道在主题中生成消息的生产者,因此没有用于列出它们的命令行工具。但是,对于这个问题的答案是,您可以通过查看jmx上的MBean来列出生产者。2) 为了列出需要运行的主题:
kafka-topics --zookeeper localhost:2181 --list
否则,如果要使用java客户机列出主题,可以调用 listTopics() 方法 KafkaConsumer .您还可以通过zookeeper获取主题列表
listTopics()
KafkaConsumer
ZkClient zkClient = new ZkClient("zkHost:zkPort"); List<String> topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient));
3) 要获取主题的架构信息,可以使用schema registry api特别是,您可以通过调用以下命令获取所有主题:
GET /subjects HTTP/1.1 Host: schemaregistry.example.com Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
应给出与以下类似的响应:
HTTP/1.1 200 OK Content-Type: application/vnd.schemaregistry.v1+json ["subject1", "subject2"]
然后可以获得特定主题的所有版本:
GET /subjects/subject-name/versions HTTP/1.1 Host: schemaregistry.example.com Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
最后,您可以获得在这个主题下注册的模式的特定版本
GET /subjects/subject_name/versions/1 HTTP/1.1 Host: schemaregistry.example.com Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
或者只是最新注册的架构:
GET /subjects/subject-name/versions/latest HTTP/1.1 Host: schemaregistry.example.com Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
为了在java中执行这些操作,您可以准备自己的get请求(请参阅此处的操作方法)或使用confluent的schema registry java客户端。您可以在github repo中看到实现和可用方法。关于zookeeper的问题,请注意zk是kafka的一个要求。Kafka使用zookeeper,所以如果你还没有zookeeper服务器,你需要先启动它。您可以使用kafka打包的方便脚本来获得一个快速而脏的单节点zookeeper示例。
1条答案
按热度按时间cnh2zyt31#
1) 我不认为kafka代理知道在主题中生成消息的生产者,因此没有用于列出它们的命令行工具。但是,对于这个问题的答案是,您可以通过查看jmx上的MBean来列出生产者。
2) 为了列出需要运行的主题:
否则,如果要使用java客户机列出主题,可以调用
listTopics()
方法KafkaConsumer
.您还可以通过zookeeper获取主题列表
3) 要获取主题的架构信息,可以使用schema registry api
特别是,您可以通过调用以下命令获取所有主题:
应给出与以下类似的响应:
然后可以获得特定主题的所有版本:
最后,您可以获得在这个主题下注册的模式的特定版本
或者只是最新注册的架构:
为了在java中执行这些操作,您可以准备自己的get请求(请参阅此处的操作方法)或使用confluent的schema registry java客户端。您可以在github repo中看到实现和可用方法。
关于zookeeper的问题,请注意zk是kafka的一个要求。
Kafka使用zookeeper,所以如果你还没有zookeeper服务器,你需要先启动它。您可以使用kafka打包的方便脚本来获得一个快速而脏的单节点zookeeper示例。