如何在Kafka的剧本中正确地删除并创建主题?

zc0qhyus  于 2022-11-21  发布在  Apache
关注(0)|答案(2)|浏览(139)

我正在编写一个脚本来刷新我在AWS管理的Kafka集群上的主题。每当我运行脚本时,我需要清除现有数据,我通过删除和再次创建相同的主题来完成。我希望脚本在我重复运行时打印出成功删除和成功创建的结果。但删除/创建每隔一次运行都会失败,这让我感到困惑。
下面是我的脚本:

# manage_topics.py
import sys
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import KafkaError, KafkaException
if __name__ == '__main__':
    kafka_cfg = '.....' # omitted  
    admin_client = AdminClient(kafka_cfg)

    deletion_ret = admin_client.delete_topics(['my-test-topic1'])
    for topic, delete_fut in deletion_ret.items():
        try:
            status = delete_fut.result()
            print(f'{topic} deletion is successful. status={status}')
        except KafkaException as e:
            print(f'could not delete topic: {topic}, error: {str(e)}')
            if e.args[0].code() != KafkaError.UNKNOWN_TOPIC_OR_PART:
                print('exiting...')
                sys.exit(1)
            else:
                print('ignoring UNKNOWN_TOPIC_OR_PART error')

    # I have two brokers for the Kafka instance I was given
    creation_ret = admin_client.create_topics([NewTopic('my-test-topic1', 5, 2)])
    for topic, create_fut in creation_ret.items():
        try:
            status = create_fut.result()
            print(f'{topic} creation is successful. status={status}')
        except KafkaException as e:
            print(f'could not create topic: {topic}, error: {str(e)}')

这是它生成的日志。我在每次运行之间等待了多长时间并不重要。对我来说,似乎当一个成功的删除之后是一个创建时,删除需要时间,所以下面的创建会失败。当我再次运行它时,前一个删除会完成,然后当前的删除会失败,创建会成功。
如果有人能帮助我理解和改进这个脚本,我将非常感激。

$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
vmdwslir

vmdwslir1#

主题删除是服务器端的异步操作。您将来的结果仅捕获请求的响应(主题被 * 标记 * 为删除),而不是实际集群删除 * 所有副本 *。
相关Kafka - delete a topic completely

bd1hkmkf

bd1hkmkf2#

有同样的问题。
主题删除命令由脚本发出,但实际删除主题需要一段时间。
这就是为什么它 * 每隔一次运行 * 就有效:第一次运行时会发出删除命令,但在进入下一个脚本命令之前不会完成。当您第二次运行该脚本时,该主题已经被删除。
据我所知,没有办法等待admin_client.delete_topics完成,但如果有,请告诉我。
我通过在admin_client.delete_topics后面添加一个“sleep”来解决这个问题。

相关问题