kafka timeoutexception停止应用程序启动-如何正确处理此情况?

kg7wmglp  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(266)

我们在spring-boot应用程序中使用kafka消费者。创建bean时,将创建kafkalistenerendpointregistry,并尝试获取容器的数据。现在的问题是,我们有时会收到一个timeoutexception,它会停止应用程序的启动。请参阅完整堆栈跟踪。

2019-08-28 07:49:46,053 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
    at org.springframework.boot.web.servlet.support.SpringBootServletInitializer.run(SpringBootServletInitializer.java:151)
    at org.springframework.boot.web.servlet.support.SpringBootServletInitializer.createRootApplicationContext(SpringBootServletInitializer.java:131)
    at com.pack.MyApplication.createRootApplicationContext(MyApplication.java:45)
    at org.springframework.boot.web.servlet.support.SpringBootServletInitializer.onStartup(SpringBootServletInitializer.java:91)
    at org.springframework.web.SpringServletContainerInitializer.onStartup(SpringServletContainerInitializer.java:171)
    at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5132)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
    at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:717)
    at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:690)
    at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:705)
    at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:978)
    at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1849)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.tomcat.util.threads.InlineExecutorService.execute(InlineExecutorService.java:75)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at org.apache.catalina.startup.HostConfig.deployWARs(HostConfig.java:773)
    at org.apache.catalina.startup.HostConfig.deployApps(HostConfig.java:427)
    at org.apache.catalina.startup.HostConfig.start(HostConfig.java:1576)
    at org.apache.catalina.startup.HostConfig.lifecycleEvent(HostConfig.java:309)
    at org.apache.catalina.util.LifecycleBase.fireLifecycleEvent(LifecycleBase.java:123)
    at org.apache.catalina.util.LifecycleBase.setStateInternal(LifecycleBase.java:423)
    at org.apache.catalina.util.LifecycleBase.setState(LifecycleBase.java:366)
    at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:936)
    at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:841)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1384)
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1374)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.tomcat.util.threads.InlineExecutorService.execute(InlineExecutorService.java:75)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
    at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:909)
    at org.apache.catalina.core.StandardEngine.startInternal(StandardEngine.java:262)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
    at org.apache.catalina.core.StandardService.startInternal(StandardService.java:421)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
    at org.apache.catalina.core.StandardServer.startInternal(StandardServer.java:932)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
    at org.apache.catalina.startup.Catalina.start(Catalina.java:633)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.catalina.startup.Bootstrap.start(Bootstrap.java:344)
    at org.apache.catalina.startup.Bootstrap.main(Bootstrap.java:475)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

由于我是Kafka的新手,我不知道如何正确处理这个问题,所以如果无法获取元数据,我的应用程序也会启动。
我考虑过为containerfactory设置errorhandler,但我不确定这是否能解决我的问题。

@Bean
    ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setRetryTemplate(getRetryTemplate());
        factory.setErrorHandler(new LoggingErrorHandler());
        factory.setAutoStartup(false);
        return factory;
    }

    @Bean
    public RetryPolicy getRetryPolicy(){
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(getMaxRetryAttempts());
        return simpleRetryPolicy;
    }

    @Bean
    public FixedBackOffPolicy getBackOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(getRetryInterval());
        return backOffPolicy;
    }

    @Bean
    public RetryTemplate getRetryTemplate(){
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(getRetryPolicy());
        retryTemplate.setBackOffPolicy(getBackOffPolicy());
        return retryTemplate;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.consumer.bootstrap-servers"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, env.getProperty("spring.kafka.consumer.key-deserializer"));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, env.getProperty("spring.kafka.consumer.value-deserializer"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("spring.kafka.consumer.auto-offset-reset"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, env.getProperty("spring.kafka.consumer.client-id"));

        props.put(SaslConfigs.SASL_MECHANISM, env.getProperty("kafka.sasl.mechanism"));
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty("kafka.security.protocol.config"));
        props.put(SaslConfigs.SASL_JAAS_CONFIG, env.getProperty("kafka.sasl.jaas.configuration"));

        try {
            Resource res = new ClassPathResource(env.getProperty("spring.kafka.ssl.trust-store-location"));
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, res.getFile().getAbsolutePath());
        } catch (IOException e) {
            logger.error("SSL Truststore for Kafka connection could not be found!", e);
        }
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("avro.kafka.consumer.deserializer.schema-url"));
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
                env.getProperty("avro.kafka.consumer.deserializer.specific-avro-reader"));

        return props;
    }

    private int getMaxRetryAttempts(){
        return this.retryAttempts;
    }

    private int getRetryInterval(){
        return this.retryInterval;
    }

任何人都可以解释containerstoppingerrorhandler在这种情况下是如何工作的,以及它是否可以解决我的问题?我还考虑过自己创建kafkalistenerendpointregistrybean,但不知道如何配置,以便正确处理异常,并继续启动应用程序。在这种情况下,我应该考虑自动启动,如果是的话,控制和处理容器启动的正确方法是什么?
使用的版本:SpringKafka版本2.2.7.release SpringBootStarter父版本2.1.6.release
提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题