我正在写一个Kafka的消费者。我已经将Acknowledged属性设置为manual。所以每当消费者处理消息失败时,我就不做ack。现在我希望消费者立即处理这个失败的消息,但它没有。
这是我的消费者配置类的样子。
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "${crmdsforecast.judjement-consumer-groupId}");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest" );
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
System.out.println("Error while processing the record {}"+ exception.getCause().getMessage());
}, new FixedBackOff(3000L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
我的Consumer方法如下所示。
@KafkaListener( containerFactory = "kafkaListenerContainerFactory",id = "${id}", topics = "${topicname}")
public void consume(String message,Acknowledgment acknowledgment){
Dto payload = new Dto();
try{
payload = payloadDeserializer.convertIntoDtoObject(message);
if(payload != null)
//Do Something;
acknowledgment.acknowledge();
}
catch(JsonProcessingException e){
log.error("Error occured while Deserializing the String input {}",message);
acknowledgment.acknowledge();
}
catch(Exception e){
log.error("Some error occured while updating revenueLines {}",e.getMessage());
//Here I expect that if error comes consumer should reread the message.
}
}
我尝试了包括SeekToCurrentBatchErrorHandler
在内的多种解决方案。我将containerFactory方法更改为如下所示。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
errorHandler.setBackOff( new ExponentialBackOff(500L, 2L));
factory.setBatchErrorHandler(errorHandler);
factory.setBatchListener(true);
factory.setConsumerFactory(consumerFactory());
return factory;
}
我还尝试使用DefaultErrorHandler
,但创建了单独的bean并设置了commonErrorHandler,但它不起作用。
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(1000, 3);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
}, fixedBackOff);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}
有人能帮忙吗?
1条答案
按热度按时间p8ekf7hl1#
Kafka保持2个指针、提交的偏移量和当前位置;除了在消费者第一次启动时将位置设置为所提交的偏移量之外,它们是不相关的。
不确认记录不会导致重新传递记录,因为位置不会更改。
您正在捕获异常。
您需要将异常抛给容器;默认错误处理程序将执行查找以重新定位分区并重新递送记录。
参见Spring for Apache Kafka的文档。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling