java 应用程序停止时,从SmartLifeCycle stop()正确提交最新处理的Kafka偏移

btqmn9zl  于 2023-03-16  发布在  Java
关注(0)|答案(1)|浏览(97)

Kafka - commiting offset before consumer is shut down on app stop + commiting offset from the past
这里的答案建议从SmartLifeCycle的stop()方法提交。
我使用的是spring-kafka和@KafkaListener注解,如何才能正确地从SmartLifeCycle stop()方法中的listener获取Acknowledgement对象?我现在所做的是为consumer创建一个全局变量,然后将其赋值为从listener内部获取的consumer。然后在stop()中调用:

consumer.commitSync(processedOffsets)

它按预期的那样工作。但我认为它不正确!
在stop()方法内部提交也会引发警告:
“Kafka消费者对多线程处理不安全”
因为它运行在关闭线程上而不是Kafka侦听器线程上。
我可以知道提交stop()方法的最新偏移量的正确方法吗?

jk9hmnmh

jk9hmnmh1#

您不应直接与消费者互动。
容器在停止时会自动提交任何已确认的偏移量。因此请停止容器。

相关问题