需要spring kafka自定义日志记录

zpf6vheq  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(371)

我有一个标准的SpringKafka设置如下,

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> feedbackStreamListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
@KafkaListener(topics = ("my-topic"),groupId = ("groupId"),containerFactory = "listenerContainerFactory")
    public void myListener(@Payload String message) {
        System.out.println("Received Message : " + message);
        // do some heavy processing
    }

现在我需要在3个场景上定制日志(从我的应用程序生成日志),
当我引导消费者时,比如说kafka集群关闭了或者我提供了错误的引导服务器,我需要自定义日志。
当使用者成功启动时,在轮询之前提供一个自定义日志。
当存在自定义消息转换器时,如果存在反序列化问题,则自定义日志。

7cwmlq89

7cwmlq891#

看到了吗 KafkaEvent 实施:https://docs.spring.io/spring-kafka/docs/current/reference/html/#events.
所以,当你抓住 ConsumerFailedToStartEvent@EventListener ,您可以使用 ConsumerStartedEvent .
甚至对于反序列化问题也没有解决方法,但是您肯定可以使用 GenericErrorHandler 要在使用者发生错误时记录某些内容,请执行以下操作:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-错误处理

相关问题