我正在尝试一个案例,当有两个或更多的消费者,听一个共同的主题只包含一个分区。我暂停了其中一个消费者,在这段时间内,其他没有暂停的消费者应该能够从主题中获取消息。我观察到,只有当暂停的消费者恢复时,其他活跃消费者中的一个才能接收消息。我怎样才能通过 Spring Kafka实现这一点。我用的是SpringKafka2.0.0.m2。
下面是我的代码行
public class listener{
@KafkaListener(id = "id2", topics = "abcd", group = "group1", containerFactory = "kafkaListenerContainerFactory")
public void listenPartition1(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) Long offsets, Acknowledgment acknowledgment, Consumer consumer) throws InterruptedException {
count_consumer2 = count_consumer2 + 1;
if(count_consumer2 == 10) {
consumer.pause(consumer.assignment());
}
acknowledgment.acknowledge();
}
@KafkaListener(id = "id1", topics = "abcd", group = "group1", containerFactory = "kafkaListenerContainerFactory")
public void listenPartition1(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) Long offsets, Acknowledgment acknowledgment, Consumer consumer) throws InterruptedException {
count_consumer1 = count_consumer1 + 1;
if(count_consumer1 == 5) {
consumer.pause(consumer.assignment());
while(data.database <10){
consumer.resume(consumer.assignment());
}
}
acknowledgment.acknowledge();
}
}
2条答案
按热度按时间7uhlpewt1#
checkout pause()方法文档:
暂停从请求的分区获取。在使用resume(collection)恢复这些分区之前,将来对poll(long)的调用不会返回这些分区中的任何记录。
请注意,此方法不影响分区订阅。特别是,当使用自动分配时,它不会导致组重新平衡。
pause方法不会触发分区重新平衡,因此此组中的其他使用者不会使用此分区,请注意,同一使用者组中只有一个使用者将使用分区。因此,在您的情况下,只有一个使用者工作,其他使用者处于空闲状态。
fhg3lkii2#
您应该重新划分您的主题,使其至少具有与活动的互斥使用者(即相同的group.id)一样多的分区。Kafka就是这样设计的。