kafka管理员:如何以编程方式显示和设置每个主题的保留时间?

agxfikkp  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(704)

我正在尝试以编程方式(java)获取和设置kafka集群中某些主题集的保留时间。
似乎不可能使用 org.apache.kafka.clients.admin.AdminClient .
除了命令行实用程序,还有其他方法吗?

s3fp2yjn

s3fp2yjn1#

仿效 kafka-configs --entity-type topics --entity-name "topic" --describe ,您应该能够使用 AdminClient#describeConfigs 和这个一样。
在这里,我只过滤用户明确定义的配置。如果删除过滤器,您将获得所有主题级和代理级的默认配置

Optional<List<ConfigEntry>> dynamicTopicConfigEntries;

try {
    // given org.apache.kafka.client.admin.AdminClient
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "topic");

    dynamicTopicConfigEntries = Optional.of(adminClient.describeConfigs(Collections.singletonList(resource))
            .all()
            .thenApply(configMap -> configMap.get(resource).entries()
                    .stream().filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
                    .collect(toList())
            )
            .get());
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException("Unable to get topic description");
}

同样,还有一个 --alter 该命令支持的标志(没有可用的代码)
另外,kip-248也是值得关注的。

js5cn81o

js5cn81o2#

这就是 kafka.admin.TopicCommand scala类,这是 kafka-topics shell脚本使用kafka二进制发行版:
https://github.com/apache/kafka/blob/a421dd2a26ca140f821cd5be1a4f716cf04beb43/core/src/main/scala/kafka/admin/topiccommand.scala#l302-l318级
您可以使用它,尽管您需要将kafka包作为项目的依赖项,而不仅仅是kafka客户机。
如果您使用的是为scala 2.12编译的kafka 2.1.1:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
</dependency>

https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.1.1

相关问题