我需要知道Kafka记录所指向的Kafka主题的分区号。在执行
producer.send(record);
有没有办法知道那条记录去了哪个分区?
xiozqbni1#
恐怕不可能使用默认的循环分区器。如果您指定一个键,您可以从producer源代码获得默认算法,并尝试预测(这有点像 hash(key) % num.partitions )如果您使用自定义分区器,您将处理它并已经知道。我想知道producerinterceptor,但它在分配分区之前提供了信息,您可以从doc中看到。https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/producerinterceptor.html
hash(key) % num.partitions
gopyfrb32#
我在这里看到三种选择:或者在producerrecord中指定分区,如producerrecord类的构造函数中所示定义自定义分区器,如另一篇文章中所示利用 AdminClient api调用describetopicsresult以获取特定主题的分区数信息,然后重新应用kafka使用的默认分区器逻辑:
AdminClient
kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
2条答案
按热度按时间xiozqbni1#
恐怕不可能使用默认的循环分区器。如果您指定一个键,您可以从producer源代码获得默认算法,并尝试预测(这有点像
hash(key) % num.partitions
)如果您使用自定义分区器,您将处理它并已经知道。我想知道producerinterceptor,但它在分配分区之前提供了信息,您可以从doc中看到。https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/producerinterceptor.html
gopyfrb32#
我在这里看到三种选择:
或者在producerrecord中指定分区,如producerrecord类的构造函数中所示
定义自定义分区器,如另一篇文章中所示
利用
AdminClient
api调用describetopicsresult以获取特定主题的分区数信息,然后重新应用kafka使用的默认分区器逻辑: