Spring Kafka setErrorHandler已弃用替换( Boot 2.6.4)

xxslljrj  于 2023-03-17  发布在  Apache
关注(0)|答案(3)|浏览(381)

在 Spring Boot 2.6.4上,不推荐使用此方法。

public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, consumerFactory());

        // deprecated
        factory.setErrorHandler(new GlobalErrorHandler());

        return factory;
    }

全局错误处理程序类

public class GlobalErrorHandler implements ConsumerAwareErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        // my custom global logic (e.g. notify ops team via slack)
    }

}

这个的替换示例是什么?文档说我应该使用setCommonErrorHandler,但是如何实现CommonErrorHandler接口,因为那里没有方法可以覆盖。
关键是,我必须根据特定条件(消息类型,可在Kafka消息头中找到)向操作团队发送空闲通知。
这不是阻塞,只是一个恼人的消息,虽然。谢谢

wydwbb8l

wydwbb8l1#

参见Spring for Apache Kafka文档;传统错误处理程序被CommonErrorHandler实现所取代。

最新消息

https://docs.spring.io/spring-kafka/docs/current/reference/html/#x28-eh
旧版GenericErrorHandler及其用于记录批处理监听程序的子接口层次结构已被新的单个接口CommonErrorHandler所取代,该接口的实现与GenericErrorHandler的大多数旧版实现相对应。有关详细信息,请参阅容器错误处理程序。

容器错误处理程序

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers
从版本2.8开始,旧的ErrorHandlerBatchErrorHandler接口已被新的CommonErrorHandler取代。这些错误处理程序可以处理记录监听器和批处理监听器的错误。从而允许单个监听器容器工厂为两种类型的监听器创建容器。提供了CommonErrorHandler实现以替换大多数旧框架错误处理程序实现,旧错误处理程序已弃用。侦听器容器和侦听器容器工厂仍然支持遗留接口;在将来的版本中将不建议使用它们。

f5emj3cl

f5emj3cl2#

我遇到了完全相同的问题,所以我将方法实现ConsumerAwareErrorHandler更改为
常见错误处理程序
并得到执行
句柄记录
就像文档中描述的那样,它起作用了!

public class GlobalErrorHandler implements CommonErrorHandler {

  private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

  @Override
  public void handleRecord(
      Exception thrownException,
      ConsumerRecord<?, ?> record,
      Consumer<?, ?> consumer,
      MessageListenerContainer container) {
    log.warn("Global error handler for message: {}", record.value().toString());
  }
}

在Kafka配置类中

@Bean(value = "kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory());

    factory.setCommonErrorHandler(new GlobalErrorHandler());

    return factory;
  }
yws3nbqq

yws3nbqq3#

对于正在使用更新版本的Spring-Boot 3.0.x的人来说,下面是基于@GarryRussel输入的实现:handleRecord()已从Sping Boot 2.9中弃用,handleOne()是其替代品。

factory.setCommonErrorHandler(new CommonErrorHandler() {
        @Override
        public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
            return CommonErrorHandler.super.handleOne(thrownException, record, consumer, container);
        }

        @Override
        public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
            CommonErrorHandler.super.handleOtherException(thrownException, consumer, container, batchListener);
        }
    });

相关问题