用于设置seektocurrentbatcherrorhandler的spring kafka属性

waxmsbnn  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(779)

处理批处理时是否存在spring.kafka属性错误 spring.kafka.listener.type=BATCH 以及 spring.kafka.listener.ack-mode=BATCH 使用seektocurrentbatcherrorhandler?提前谢谢。

cwdobuhd

cwdobuhd1#

不能使用属性设置它,但可以覆盖boot的自动配置容器工厂,如下所示:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);

    factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());

    return factory;
}

它将获得所有引导属性,然后您可以根据需要进一步配置工厂。

相关问题