当我暂停某个消费者时,来自同一消费者组的其他消费者应该会收到消息

lndjwyie  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(470)

我正在尝试一个案例,当有两个或更多的消费者,听一个共同的主题只包含一个分区。我暂停了其中一个消费者,在这段时间内,其他没有暂停的消费者应该能够从主题中获取消息。我观察到,只有当暂停的消费者恢复时,其他活跃消费者中的一个才能接收消息。我怎样才能通过 Spring Kafka实现这一点。我用的是SpringKafka2.0.0.m2。
下面是我的代码行

public class listener{
    @KafkaListener(id = "id2", topics = "abcd", group = "group1", containerFactory = "kafkaListenerContainerFactory")
        public void listenPartition1(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) Long offsets, Acknowledgment acknowledgment, Consumer consumer) throws InterruptedException {
            count_consumer2 = count_consumer2 + 1;
            if(count_consumer2 == 10) {
                consumer.pause(consumer.assignment());
            }
            acknowledgment.acknowledge();

    }

    @KafkaListener(id = "id1", topics = "abcd", group = "group1", containerFactory = "kafkaListenerContainerFactory")
        public void listenPartition1(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) Long offsets, Acknowledgment acknowledgment, Consumer consumer) throws InterruptedException {
            count_consumer1 = count_consumer1 + 1;
            if(count_consumer1 == 5) {
                consumer.pause(consumer.assignment());
                while(data.database <10){
                      consumer.resume(consumer.assignment());
                 }
            }

             acknowledgment.acknowledge();

        }
}
7uhlpewt

7uhlpewt1#

checkout pause()方法文档:
暂停从请求的分区获取。在使用resume(collection)恢复这些分区之前,将来对poll(long)的调用不会返回这些分区中的任何记录。
请注意,此方法不影响分区订阅。特别是,当使用自动分配时,它不会导致组重新平衡。
pause方法不会触发分区重新平衡,因此此组中的其他使用者不会使用此分区,请注意,同一使用者组中只有一个使用者将使用分区。因此,在您的情况下,只有一个使用者工作,其他使用者处于空闲状态。

fhg3lkii

fhg3lkii2#

您应该重新划分您的主题,使其至少具有与活动的互斥使用者(即相同的group.id)一样多的分区。Kafka就是这样设计的。

相关问题