我对Kafka很陌生。使用SpringBootKafka,我使用一个消息对象和手动确认开发了一个发布者和一个使用者。我的代码使用spring注解。这很好用。现在,当我连接到生产代理时,这个代理不会发送一条消息,而是一个消息列表。我的侦听器方法具有以下签名:
@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload Message message, Acknowledgment ack)
所以我可以确认每一条信息。很好。但现在看来我必须用
@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload List<Message> messages, Acknowledgment ack)
即使遵循文档,我似乎也应该使用
@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload List<Message> messages, Acknowledgment ack, Consumer<?,?> consumer)
我应该将batchmode设置为true吗?
现在的问题是:当这条消息被完全处理后,我如何确认每条消息?
非常感谢你的帮助
1条答案
按热度按时间ma8fv8wu1#
如果您确实想手动提交偏移量,那么类似的方法可以帮助您。
如果你不想要它,那就换
setAckMode
其他价值。这件事是按Spring的方式做的。
CoreAutoConfiguration
班级:然后你就走了
Config
班级:最后,消费者:
最后,属性: