我正在尝试以编程方式(java)获取和设置kafka集群中某些主题集的保留时间。似乎不可能使用 org.apache.kafka.clients.admin.AdminClient .除了命令行实用程序,还有其他方法吗?
org.apache.kafka.clients.admin.AdminClient
s3fp2yjn1#
仿效 kafka-configs --entity-type topics --entity-name "topic" --describe ,您应该能够使用 AdminClient#describeConfigs 和这个一样。在这里,我只过滤用户明确定义的配置。如果删除过滤器,您将获得所有主题级和代理级的默认配置
kafka-configs --entity-type topics --entity-name "topic" --describe
AdminClient#describeConfigs
Optional<List<ConfigEntry>> dynamicTopicConfigEntries; try { // given org.apache.kafka.client.admin.AdminClient ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "topic"); dynamicTopicConfigEntries = Optional.of(adminClient.describeConfigs(Collections.singletonList(resource)) .all() .thenApply(configMap -> configMap.get(resource).entries() .stream().filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) .collect(toList()) ) .get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Unable to get topic description"); }
同样,还有一个 --alter 该命令支持的标志(没有可用的代码)另外,kip-248也是值得关注的。
--alter
js5cn81o2#
这就是 kafka.admin.TopicCommand scala类,这是 kafka-topics shell脚本使用kafka二进制发行版:https://github.com/apache/kafka/blob/a421dd2a26ca140f821cd5be1a4f716cf04beb43/core/src/main/scala/kafka/admin/topiccommand.scala#l302-l318级您可以使用它,尽管您需要将kafka包作为项目的依赖项,而不仅仅是kafka客户机。如果您使用的是为scala 2.12编译的kafka 2.1.1:
kafka.admin.TopicCommand
kafka-topics
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.1</version> </dependency>
https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.1.1
2条答案
按热度按时间s3fp2yjn1#
仿效
kafka-configs --entity-type topics --entity-name "topic" --describe
,您应该能够使用AdminClient#describeConfigs
和这个一样。在这里,我只过滤用户明确定义的配置。如果删除过滤器,您将获得所有主题级和代理级的默认配置
同样,还有一个
--alter
该命令支持的标志(没有可用的代码)另外,kip-248也是值得关注的。
js5cn81o2#
这就是
kafka.admin.TopicCommand
scala类,这是kafka-topics
shell脚本使用kafka二进制发行版:https://github.com/apache/kafka/blob/a421dd2a26ca140f821cd5be1a4f716cf04beb43/core/src/main/scala/kafka/admin/topiccommand.scala#l302-l318级
您可以使用它,尽管您需要将kafka包作为项目的依赖项,而不仅仅是kafka客户机。
如果您使用的是为scala 2.12编译的kafka 2.1.1:
https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.1.1