我试图找到一种方法,在对某个分区执行操作之前,检查该分区是否已分配。我们知道通过检查来发现消费者是否被分配的方法
consumer.assignment().isEmpty
但我将如何做的分区,我的代码如下
List<PartitionInfo> myPartitionInfo = consumer.partitionsFor(..);
mypartitionInfo.stream()
// before coming to this map i need to check if this partition is assigned
.map(part->new TopicPartition(subTopicName,part.partition()))
.......;
有什么办法可以做到吗?请给我建议。
1条答案
按热度按时间j9per5c41#
你已经回答了你自己的问题。使用
consumer.assignment()
否则,在您的消费者轮询循环中,您还可以访问每个记录所来自的分区,并可以在那里进行检查。