Kafka消费者不重试处理失败的消息

umuewwlo  于 2023-03-07  发布在  Apache
关注(0)|答案(1)|浏览(250)

我正在写一个Kafka的消费者。我已经将Acknowledged属性设置为manual。所以每当消费者处理消息失败时,我就不做ack。现在我希望消费者立即处理这个失败的消息,但它没有。
这是我的消费者配置类的样子。

public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "${crmdsforecast.judjement-consumer-groupId}");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest" );
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            System.out.println("Error while processing the record {}"+  exception.getCause().getMessage());
        }, new FixedBackOff(3000L, 2L));
        factory.setErrorHandler(errorHandler);
        
        return factory;
    }

我的Consumer方法如下所示。

@KafkaListener( containerFactory = "kafkaListenerContainerFactory",id = "${id}", topics = "${topicname}")
    public void consume(String message,Acknowledgment acknowledgment){
        Dto payload = new Dto();
        try{
            payload = payloadDeserializer.convertIntoDtoObject(message);
            
            if(payload != null)
                //Do Something;
            
            acknowledgment.acknowledge();
        }
        catch(JsonProcessingException e){
            log.error("Error occured while Deserializing the String input {}",message);
            acknowledgment.acknowledge();
        }
        catch(Exception e){
            log.error("Some error occured while updating revenueLines {}",e.getMessage());
            //Here I expect that if error comes consumer should reread the message.
        }
    }

我尝试了包括SeekToCurrentBatchErrorHandler在内的多种解决方案。我将containerFactory方法更改为如下所示。

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
        errorHandler.setBackOff( new ExponentialBackOff(500L, 2L));
        factory.setBatchErrorHandler(errorHandler);
        factory.setBatchListener(true);
        factory.setConsumerFactory(consumerFactory());
        
        return factory;
    }

我还尝试使用DefaultErrorHandler,但创建了单独的bean并设置了commonErrorHandler,但它不起作用。

@Bean
    public DefaultErrorHandler errorHandler() {
        BackOff fixedBackOff = new FixedBackOff(1000, 3);
        DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
            System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
        }, fixedBackOff);
        errorHandler.addNotRetryableExceptions(NullPointerException.class);
        return errorHandler;
    }

有人能帮忙吗?

p8ekf7hl

p8ekf7hl1#

Kafka保持2个指针、提交的偏移量和当前位置;除了在消费者第一次启动时将位置设置为所提交的偏移量之外,它们是不相关的。
不确认记录不会导致重新传递记录,因为位置不会更改。
您正在捕获异常。
您需要将异常抛给容器;默认错误处理程序将执行查找以重新定位分区并重新递送记录。
参见Spring for Apache Kafka的文档。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling

相关问题