我是Kafka的新手,使用spring boot@kafkalistener创建了consumer。
我的用例是一旦从kafka分区读取了消息,我就需要进行处理,当出现任何异常时,需要在一段时间后重新处理消息。在异常情况下,我不应该更新偏移量,在服务器启动后,我需要再次处理消息。
以下是配置
@Configuration
@EnableKafka
public class ReceiverConfiguration {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some broker configuration>");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "Test-Consumer-Group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
@Bean
public Listener listener() {
System.out.println("%%%%%%%%% Initializing Listener %%%%%%%");
return new Listener();
}
}
下面是listener类
public class Listener {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
public CountDownLatch getCountDownLatch1() {
return countDownLatch1;
}
private CountDownLatch countDownLatch1 = new CountDownLatch(1);
@KafkaListener(topics = "topic")
public void listen(ConsumerRecord<String, CustomObject> record, Acknowledgment ack) throws Exception{
logger.info("********1 message: "+record);
//ack.acknowledge();
}
}
场景1:在consumer service运行期间,当生产者发送消息时,监听器类读取消息并且不更新偏移量,直到这部分看起来良好为止。如果停止使用者,则在使用者组中更新偏移量。问题:在服务器停止方案期间不应更新偏移量。一旦我的后端处理问题得到解决,当我重新启动消费服务时,我只需要在未提交偏移量时再次使用消息。但是这里提交了偏移量,我不可能再次使用来自分区的消息。
场景2:假设我的消费者服务关闭,生产者将消息发送到主题分区,可以看到偏移量没有增加,延迟为1。使用ack.acknowledge()未启用静态服务,即代码仅被注解掉,即使在使用者组中提交了偏移量。问题:在我确认补偿之前,不应该提交补偿。服务器启动时发现问题。
请帮助我解决问题,未能找到正确的重定向。
谢谢你的帮助
暂无答案!
目前还没有任何答案,快来回答吧!