所以问题很简单,但我很长一段时间都找不到答案。
我有我的Kafka消费者手动确认,在应用程序关闭之前,我想执行一些代码,然后确认Kafka。因此,我使用@predestroy注解:
@PreDestroy
private void beforeShutdown() {
//do some code
acknowledgement.acknowledge(); //this variable is stored on class level
现在主要的问题是,kafka consumer在它执行之前被关闭,因此消息实际上没有得到确认,我在启动应用程序时再次收到它们,所以我需要某种解决方法或其他方法来指定这个函数作为关闭前调用的第一个函数。从日志中可以看出这一点:
Shutting down ExecutorService
Consumer stopped
Shutting down ExecutorService 'taskScheduler'
Shutting down ExecutorService 'applicationTaskExecutor'
EXECUTING MY CODE
[Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
如果有人有什么建议,请告诉我。
1条答案
按热度按时间umuewwlo1#
实施
SmartLifeCycle
把密码放进去stop()
. 把豆子放在很高的地方Phase
所以它在容器前停止。集装箱是同相的Integer.MAX_VALUE - 100
默认情况下,它必须高于这个值。编辑