如何用pykafka创建新的主题,包括分区和复制?

7ajki6be  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(364)

我希望能够使用pykafka以编程方式在kafka中创建一个主题。我知道如果一个主题不存在,访问topicdict会自动创建一个主题,但我不知道如何用它来控制分区/副本的数量。此外,它还有一个讨厌的bug,如果Kafka倒下的话,它会以一个无限循环结束。基本上我想做如下事情:

create_topic('mytopic', partitions=2, replicas=3)
b1payxdu

b1payxdu1#

你可以用它 subprocess 如果你安装Kafka二进制文件,你可以这样做

from pykafka import KafkaClient
import subprocess

client = KafkaClient(hosts="localhost:9092")

subprocess.Popen("PATH/TO/KAFKA/BINARY/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic testtopic --replication-factor 1 --partitions 10".split())
ecfdbz9o

ecfdbz9o2#

pykafka是kafka生产者和消费者api的python实现,您想要实现的是在kafka中使用另一个api,管理/操作api(实际上是一组java类)执行的操作。我认为pykafka没有api/ Package 器。你可能观察到的是一个由Kafka自动创建的主题。您可以使用属性为自动创建的主题配置默认数量的分区和副本。

相关问题