到目前为止,我还没有看到一个python客户机在不使用配置选项自动创建主题的情况下显式实现主题的创建。
3bygqnnd1#
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092']) topic = 'topic-name' producer.send(topic, final_list[0]).get(timeout=10)
xiozqbni2#
如果你能跑的话 confluent_kafka (Python) v0.11.6 或以上,则以下是如何 create kafka topics , list kafka topics 以及 delete kafka topics :
confluent_kafka
v0.11.6
create kafka topics
list kafka topics
delete kafka topics
>>> import confluent_kafka.admin, pprint >>> conf = {'bootstrap.servers': 'broker01:9092'} >>> kafka_admin = confluent_kafka.admin.AdminClient(conf) >>> new_topic = confluent_kafka.admin.NewTopic('topic100', 1, 1) # Number-of-partitions = 1 # Number-of-replicas = 1 >>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple). {'topic100': <Future at 0x7f524b0f1240 state=running>} # Stdout from above command. >>> pprint.pprint(kafka_admin.list_topics().topics) # LIST {'topic100' : TopicMetadata(topic100, 1 partitions), 'topic99' : TopicMetadata(topic99, 1 partitions), 'topic98' : TopicMetadata(topic98, 1 partitions)}
以及 delete kafka topics 用同样的方法 kafka_admin 对象,这个:
kafka_admin
kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE
我希望这有帮助(◠﹏◠)/
kqhtkvqz3#
已经太晚了。我不知道显式创建主题的命令,但是下面创建并添加消息。我创建了一个python kafka制作人:
prod = KafkaProducer(bootstrap_servers='localhost:9092') for i in xrange(1000): prod.send('xyz', str(i))
在Kafka的主题列表中 xyz 以前没有。当我执行上述方法时,python kafka客户机创建了它并向其中添加了消息。
xyz
wz3gfoph4#
您可以使用 kafka-python 或者 confluent_kafka 客户端,它是librdkafka的轻量级 Package 器。使用 kafka-python ```from kafka.admin import KafkaAdminClient, NewTopic
kafka-python
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092",client_id='test')
topic_list = []topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))admin_client.create_topics(new_topics=topic_list, validate_only=False)
使用 `confluent_kafka` ``` from confluent_kafka.admin import AdminClient, NewTopic admin_client = AdminClient({ "bootstrap.servers": "localhost:9092" }) topic_list = [] topic_list.append(NewTopic("example_topic", 1, 1)) admin_client.create_topics(topic_list)
bjp0bcyl5#
似乎没有kafka服务器api来创建主题,因此您必须使用自动创建主题或命令行工具:
bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
bgibtngc6#
看起来您可以使用以下方法来确保您的主题已经存在(我假设您使用的是以下kafka python实现):
client = KafkaClient(...) producer = KafkaProducer(...) client.ensure_topic_exists('my_new_topic') producer.send_messages('my_new_topic', ...)
h22fl7wq7#
kafka0.11中刚刚添加了用于创建和配置编程主题的adminclientapi(最初用于java)看到了吗https://cwiki.apache.org/confluence/display/kafka/kip-117%3a+add+a+public+adminclient+api+for+kafka+admin+operations随着时间的推移,非java客户机库也会添加此功能。请与您正在使用的kafkapython客户机的作者联系(有几个),看看api中是否以及何时会提供kip-4管理协议支持看到了吗https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations
7条答案
按热度按时间3bygqnnd1#
xiozqbni2#
如果你能跑的话
confluent_kafka
(Python)v0.11.6
或以上,则以下是如何create kafka topics
,list kafka topics
以及delete kafka topics
:以及
delete kafka topics
用同样的方法kafka_admin
对象,这个:我希望这有帮助(◠﹏◠)/
kqhtkvqz3#
已经太晚了。我不知道显式创建主题的命令,但是下面创建并添加消息。
我创建了一个python kafka制作人:
在Kafka的主题列表中
xyz
以前没有。当我执行上述方法时,python kafka客户机创建了它并向其中添加了消息。wz3gfoph4#
您可以使用
kafka-python
或者confluent_kafka
客户端,它是librdkafka的轻量级 Package 器。使用
kafka-python
```from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
bjp0bcyl5#
似乎没有kafka服务器api来创建主题,因此您必须使用自动创建主题或命令行工具:
bgibtngc6#
看起来您可以使用以下方法来确保您的主题已经存在(我假设您使用的是以下kafka python实现):
h22fl7wq7#
kafka0.11中刚刚添加了用于创建和配置编程主题的adminclientapi(最初用于java)
看到了吗https://cwiki.apache.org/confluence/display/kafka/kip-117%3a+add+a+public+adminclient+api+for+kafka+admin+operations
随着时间的推移,非java客户机库也会添加此功能。请与您正在使用的kafkapython客户机的作者联系(有几个),看看api中是否以及何时会提供kip-4管理协议支持
看到了吗https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations