我正在使用:https://github.com/mumrah/kafka-python 作为python中的kafkaapi。我要获取指定主题的分区数。我该怎么做?
rmbxnbpk1#
对于使用合流python或enterpriseapi的用户。可以这样做:
def count_partitions(my_partitions) -> int: count = 0 for part in my_partitions: count = count + 1 return count cluster_data: ClusterMetadata = producer.list_topics(topic=TOPIC) topic_data: TopicMetadata = cluster_data.topics[TOPIC] available_partitions: PartitionMetadata = topic_data.partitions print(count_partitions(available_partitions))
hk8txs482#
我在试图解决这个完全相同的问题时发现了这个问题。我知道这个问题由来已久,但下面是我提出的解决方案(使用kazoo与Zookeeper交谈):
from kazoo.client import KazooClient class KafkaInfo(object): def __init__(self, hosts): self.zk = KazooClient(hosts) self.zk.start() def topics(self): return self.zk.get_children('/brokers/topics') def partitions(self, topic): strs = self.zk.get_children('/brokers/topics/%s/partitions' % topic) return map(int, strs) def consumers(self): return self.zk.get_children('/consumers') def topics_for_consumer(self, consumer): return self.zk.get_children('/consumers/%s/offsets' % consumer) def offset(self, topic, consumer, partition): (n, _) = self.zk.get('/consumers/%s/offsets/%s/%d' % (consumer, topic, partition)) return int(n)
bwitn5fc3#
可能是一个稍微简单化的解决方案,但是:
from kafka import KafkaClient client = KafkaClient('SERVER:PORT') topic_partition_ids = client.get_partition_ids_for_topic(b'TOPIC') len(topic_partition_ids)
在python3.4.3/kafka-python0.9.3上测试
3条答案
按热度按时间rmbxnbpk1#
对于使用合流python或enterpriseapi的用户。可以这样做:
hk8txs482#
我在试图解决这个完全相同的问题时发现了这个问题。我知道这个问题由来已久,但下面是我提出的解决方案(使用kazoo与Zookeeper交谈):
bwitn5fc3#
可能是一个稍微简单化的解决方案,但是:
在python3.4.3/kafka-python0.9.3上测试