spring云流与kafka集成错误处理

brqmpdu1  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(187)

我正在尝试用springcloudstream和kafka集成创建一个spring启动应用程序。我在kafka中创建了一个带有1个分区的示例主题,并根据这里给出的说明从spring启动应用程序发布到该主题
http://docs.spring.io/spring-cloud-stream/docs/1.0.2.release/reference/htmlsingle/index.html
以及
https://blog.codecentric.de/en/2016/04/event-driven-microservices-spring-cloud-stream/
Spring 启动应用程序-

@SpringBootApplication
public class MyApplication {

    private static final Log logger = LogFactory.getLog(MyApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

Kafka制片人班

@Service
@EnableBinding(Source.class)
public class MyProducer {

    private static final Log logger = LogFactory.getLog(MyProducer.class);

    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
    public MessageSource<TimeInfo> timerMessageSource() {
        TimeInfo t = new TimeInfo(new Timestamp(new Date().getTime())+"","Label");
        MessageBuilder<TimeInfo> m = MessageBuilder.withPayload(t);
        return () -> m.build();
    }

    public static class TimeInfo{

        private String time;
        private String label;

        public TimeInfo(String time, String label) {
            super();
            this.time = time;
            this.label = label;
        }

        public String getTime() {
            return time;
        }

        public String getLabel() {
            return label;
        }

    }
}

除了我想处理异常的时候,一切都很好。
如果kafka主题关闭了,我可以看到connectionrejected异常被抛出到应用程序的日志文件中,但是内置的重试逻辑似乎在不停地不断重试!
我根本没有抛出异常来处理和做进一步的异常处理。我已经阅读了上面springcloudstream文档中针对kafka的producer选项和binder选项,我看不到任何定制选项可以让我捕获上面抛出的异常。
我对spring-boot/spring-cloud-stream/spring集成(它似乎是云流项目的底层实现)还很陌生。
你们还知道什么可以让这个异常级联到我的SpringCloudStream应用程序吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题