如何在Kafka中检查分区分配

zzwlnbp8  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(172)

我试图找到一种方法,在对某个分区执行操作之前,检查该分区是否已分配。我们知道通过检查来发现消费者是否被分配的方法

consumer.assignment().isEmpty

但我将如何做的分区,我的代码如下

List<PartitionInfo> myPartitionInfo = consumer.partitionsFor(..);
mypartitionInfo.stream() 
   // before coming to this map i need to check if this partition is assigned
   .map(part->new TopicPartition(subTopicName,part.partition()))
   .......;

有什么办法可以做到吗?请给我建议。

j9per5c4

j9per5c41#

你已经回答了你自己的问题。使用consumer.assignment()

// Data to check
String topic = "...";
int partition = 0;

// Check it
boolean isAssigned = consumer.assignment()
  .stream()
  .anyMatch(tp -> tp.topic().equals(topic) && tp.partition() == partition)

否则,在您的消费者轮询循环中,您还可以访问每个记录所来自的分区,并可以在那里进行检查。

相关问题