遗憾的是,Kafka的Admin.deleteTopics
API只有在收到请求后才返回--这只意味着主题被集群计划删除,但不一定现在删除。
为了说明这一点,下面的代码通常会抛出:
final var newTopic = new NewTopic("aaa", Optional.empty(), Optional.empty());
this.admin.createTopics(Collections.singleton(newTopic), opt).all().get();
this.admin.deleteTopics(Arrays.asList("aaa")).all().get();
this.admin.listTopics( ).names().get().contains("aaa"); // Returns 'false'.
this.admin.createTopics(Collections.singleton(newTopic), opt).all().get(); // <- throws
但有一个例外:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'aaa' is marked for deletion.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
...
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'aaa' is marked for deletion.
不幸的是,Admin.listTopics()
在这里没有帮助,主题在提交删除请求后不再可见。
所以问题是--有没有什么编程方法(最好是API)可以让我们监控这个主题是否真的消失了?
使用的Kafka(客户端和服务器)版本为3.2。
2条答案
按热度按时间5gfr0r5j1#
到目前为止,我还没有在API中发现任何有趣的东西,但我有一些可能的想法,不幸的是它们都很难看。
1.每个主题似乎都有多个指标--与其大小相关的Bean,例如
kafka.log:type=Log,name=LogStartOffset,topic=aaa,partition=0
。这样我们就可以等到它们消失。然而,为了完全确定,我们可能需要检查所有代理(或至少:所有托管副本的代理)。1.Broker示例似乎有一个名为
kafka.controller:type=KafkaController,name=TopicsToDeleteCount
的Bean,这可能是我们想要的,但没有细粒度控制。而且,我们可能仍然需要检查集群中的所有代理,并等待它们全部达到0。1.看看ZooKeeper/Kraft的内部结构。有趣的部分可能出现在
/brokers/topics
和/admin/delete_topics
等路径上,但这意味着带来另一个(ZooKeeper)依赖项。卡夫也是一个类似的案例,我们会深入挖掘经纪商的内部信息。y53ybaqx2#
在删除主题后创建
foo
主题,这可能会加快Kafka的删除速度。然后创建有用的主题。我的测试发现有一定的效果。