当重试次数耗尽时,如何以编程方式退出Sping Boot 应用程序,以防止Kafka偏移提交

beq87vna  于 2023-04-11  发布在  Apache
关注(0)|答案(3)|浏览(127)

我想在重试次数耗尽时以编程方式退出Sping Boot 应用程序,以便永远不会错过处理任何事件。System.exit()和SpringBootApplication.exit()都不会终止应用程序。
该应用程序正在使用一个@KafkaListener函数,该函数将事件保存到数据库,并具有一个带有客户恢复器的SeekToCurrentErrorHandler。恢复器是我试图退出应用程序以防止Kafka偏移量被提交的地方。
该应用程序也有一个@Scheduled功能,也许是什么阻止终止??
在谷歌上搜索这个结果是空的。我甚至不确定这是正确的方法,例如,如果在这里终止实际上会阻止偏移提交。下面是我尝试配置恢复器的代码(在Kotlin中):

fun kafkaListenerContainerFactory(
        configurer: ConcurrentKafkaListenerContainerFactoryConfigurer,
        kafkaConsumerFactory: ConsumerFactory<Any?, Any?>?
    ): ConcurrentKafkaListenerContainerFactory<*, *>? {
        val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        configurer.configure(factory, kafkaConsumerFactory)
        factory.setErrorHandler(
            SeekToCurrentErrorHandler(
                { _, ex ->
                    //System.exit(-1)
                    SpringApplication.exit(appContext, { -1 })
                })
        )
        return factory
    }
hc2pp10m

hc2pp10m1#

也许不是完全相同的情况下,但希望仍然有用的人:我希望我的Sping Boot 应用程序在Kafka消费者停止时关闭(通常与身份验证/授权问题有关)。Spring内部事件可以用于此。我并不是说这是最好或最正确的解决方案,但经过几次尝试,这对我来说是有效的:

import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.stereotype.Component;
    
@Component
public class MyConsumer implements ApplicationContextAware {
    private ApplicationContext appCtxt;

    @Override
    public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
        appCtxt = applicationContext;
    }

    @KafkaListener(
        topics = topicName,
        groupId = ...,
        clientIdPrefix = ...,
        containerFactory = ...
    )
    public void consume(ConsumerRecord<String, MyEvent> msg) { ... }

    @EventListener
    public void eventHandler(ConsumerStoppedEvent event) {
        log.warn("A Kafka-consumer stopped. Shutting down entire application. " + event);
        ((ConfigurableApplicationContext) appCtxt).close();
    }
}
kx1ctssn

kx1ctssn2#

Boot 注册了一个shutdown hook,以便每当JVM退出时(例如使用System.exit),应用程序上下文都将关闭。
关闭应用程序上下文包括停止诸如侦听器容器之类的组件;所以在那里停止它会导致几秒钟的死锁。
您应该使用类似于ContainerStoppingErrorHandler中的逻辑;并在停止容器后引发异常。

编辑

下面是一个例子:

@SpringBootApplication
public class So67076203Application {

    private static final Logger log = LoggerFactory.getLogger(So67076203Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So67076203Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67076203").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so67076203", topics = "so67076203")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("x");
    }

    @Bean
    ErrorHandler stceh(ConsumerRecordRecoverer recoverer) {
        return new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(2000L, 2L));
    }

    @Bean
    ConsumerRecordRecoverer recoverer(KafkaListenerEndpointRegistry registry) {
        return (cr, ex) -> {
            log.error("Stopping container due to error " + ex.getMessage());
            try {
                stopper().handle(ex, null, null, registry.getListenerContainer("so67076203"));
            }
            catch (KafkaException e) {
                log.error("shutting down the JVM " + e.getMessage());
                new SimpleAsyncTaskExecutor().execute(() -> System.exit(-1));
                throw e;
            }
        };
    }

    // can't be a bean because then Boot won't auto configure the stceh
    private ContainerStoppingErrorHandler stopper() {
        return new ContainerStoppingErrorHandler();
    }

}

@Component
class Customizer {

    Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
        factory.getContainerProperties().setCommitLogLevel(Level.INFO);
        factory.getContainerProperties().setStopImmediate(true);
    }

}

编辑

在2.8及更高版本中,SeekToCurrentErrorHandlerDefaultErrorHandler取代。

brc7rcf0

brc7rcf03#

我无法得到加里Russell的答案来停止我的应用程序,但找到了一个简单的解决方案。也许不完全“引导正确”或优雅,但它确实起作用。我所做的是注入KafkaListenerEndpointRegistry和我的SeekToCurrentErrorHandler调用:

registry.stop() // Injected KafkaListenerEndpointRegistry
System.exit(SpringApplication.exit(appContext, { -1 }))

相关问题