检查python中是否存在kafka主题

xxls0lw8  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(424)

我想创建一个Kafka主题,如果它还不存在。我知道如何通过bash创建主题,但不知道如何检查它是否存在。

topic_exists = ??????
if not topic_exists:
    subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--create',  
        '--zookeeper', '{}:2181'.format(KAFKAHOST),
        '--topic', str(self.topic), 
        '--partitions', str(self.partitions),
        '--replication-factor', str(self.replication_factor)])
vs91vp4v

vs91vp4v1#

另一个好方法是使用python-kafka模块:

kafka_client = kafka.KafkaClient(kafka_server_name)
server_topics = kafka_client.topic_partitions

if topic_name in server_topics:
   your code....

kafka\u client.topic\u partitions返回主题列表。

wfsdck30

wfsdck302#

你可以用 --list (List all available topics) 的选项 kafka-topics.sh 看看是否 self.topic 存在于 topics 数组,如下所示。
根据主题的数量,这种方法可能有点繁重。如果是这样的话,您也许可以使用 --describe (List details for the given topics) 如果主题不存在,则很可能返回空。我还没有彻底测试过这个,所以我不能确定这个溶液有多固体( --describe )是的,但也许值得你进一步调查。

wanted_topics = ['host_updates_queue', 'foo_bar']

topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--list',
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

for wanted in wanted_topics:
    if wanted in topics:
        print '\'{}\' topic exists!'.format(wanted)
    else:
        print '\'{}\' topic does NOT exist!'.format(wanted)

    topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--describe',
        '--topic', wanted,
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

    if not topic_desc:
        print 'No description found for the topic \'{}\''.format(wanted)

输出:

root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'

还有一个代理配置可用,因此您不必执行以下任何步骤:
auto.create.topics.enable | true |启用在服务器上自动创建主题。如果设置为true,则尝试为不存在的主题生成数据或获取元数据时,将使用默认复制因子和分区数自动创建该主题。
如果可能的话,我会采取这种方法。
注意,您应该设置主题配置( server.properties )你的经纪人 num.partitions 以及 default.replication.factor 以匹配代码段中的设置。

2exbekwf

2exbekwf3#

为此使用kafka python消费api。

import kafka 
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=your_server_list) 
new_topics = set(wanted_topics)-set(consumer.topics())
for topic in new_topics:
    create(topic)

相关问题