我想创建一个Kafka主题,如果它还不存在。我知道如何通过bash创建主题,但不知道如何检查它是否存在。
topic_exists = ??????
if not topic_exists:
subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--create',
'--zookeeper', '{}:2181'.format(KAFKAHOST),
'--topic', str(self.topic),
'--partitions', str(self.partitions),
'--replication-factor', str(self.replication_factor)])
3条答案
按热度按时间vs91vp4v1#
另一个好方法是使用python-kafka模块:
kafka\u client.topic\u partitions返回主题列表。
wfsdck302#
你可以用
--list (List all available topics)
的选项kafka-topics.sh
看看是否self.topic
存在于topics
数组,如下所示。根据主题的数量,这种方法可能有点繁重。如果是这样的话,您也许可以使用
--describe (List details for the given topics)
如果主题不存在,则很可能返回空。我还没有彻底测试过这个,所以我不能确定这个溶液有多固体(--describe
)是的,但也许值得你进一步调查。输出:
还有一个代理配置可用,因此您不必执行以下任何步骤:
auto.create.topics.enable | true |启用在服务器上自动创建主题。如果设置为true,则尝试为不存在的主题生成数据或获取元数据时,将使用默认复制因子和分区数自动创建该主题。
如果可能的话,我会采取这种方法。
注意,您应该设置主题配置(
server.properties
)你的经纪人num.partitions
以及default.replication.factor
以匹配代码段中的设置。2exbekwf3#
为此使用kafka python消费api。