kafka使用spring-boot手动更新偏移量

eiee3dmh  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(216)

我是Kafka的新手,使用spring boot@kafkalistener创建了consumer。
我的用例是一旦从kafka分区读取了消息,我就需要进行处理,当出现任何异常时,需要在一段时间后重新处理消息。在异常情况下,我不应该更新偏移量,在服务器启动后,我需要再次处理消息。
以下是配置

@Configuration
@EnableKafka

public class ReceiverConfiguration {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some broker configuration>");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "Test-Consumer-Group");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return propsMap;
    }

    @Bean
    public Listener listener() {
        System.out.println("%%%%%%%%% Initializing Listener %%%%%%%");
        return new Listener();
    }
}

下面是listener类

public class Listener {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    public CountDownLatch getCountDownLatch1() {
        return countDownLatch1;
    }

    private CountDownLatch countDownLatch1 = new CountDownLatch(1);

    @KafkaListener(topics = "topic")
    public void listen(ConsumerRecord<String, CustomObject> record, Acknowledgment ack) throws Exception{
        logger.info("********1 message: "+record);
        //ack.acknowledge();
    }
}

场景1:在consumer service运行期间,当生产者发送消息时,监听器类读取消息并且不更新偏移量,直到这部分看起来良好为止。如果停止使用者,则在使用者组中更新偏移量。问题:在服务器停止方案期间不应更新偏移量。一旦我的后端处理问题得到解决,当我重新启动消费服务时,我只需要在未提交偏移量时再次使用消息。但是这里提交了偏移量,我不可能再次使用来自分区的消息。
场景2:假设我的消费者服务关闭,生产者将消息发送到主题分区,可以看到偏移量没有增加,延迟为1。使用ack.acknowledge()未启用静态服务,即代码仅被注解掉,即使在使用者组中提交了偏移量。问题:在我确认补偿之前,不应该提交补偿。服务器启动时发现问题。
请帮助我解决问题,未能找到正确的重定向。
谢谢你的帮助

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题