当我将enable.auto.commit设置为false并尝试使用基于注解的spring-kafka@kafkalistener手动提交偏移量时,我得到一个org.springframework.kafka.listener.listenerexecutionfailedexception:无法使用传入消息调用侦听器方法
我有一个非常简单的代码如下:
@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
System.out.println("Received Messasge in group 'foo': " + message);
// TODO: Do something with the message
}
当我从制作人那里发送消息时,我得到以下例外:
org.springframework.kafka.listener.listenerexecutionfailedexception:无法对传入消息调用侦听器方法。
终结点处理程序详细信息:
方法[public void com.******************.kafkamessagelistener.listenfoogroup(java.lang.string,org.springframework.kafka.support.acknowledgement)]
bean[com.......*.*****。kafkamessagelistener@5856dbe4]; 嵌套异常为org.springframework.messaging.converter.messageconversionexception:无法处理消息;嵌套异常为org.springframework.messaging.converter.messageconversionexception:无法将genericmessage[payload=test,headers={kafka\u offset=57,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=0,kafka\u receivedtopic=demotopic}]从[java.lang.string]转换为[org.springframework.kafka.support.acknowledment],failedmessage=genericmessage[payload=test,headers={kafka\u offset=57,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=0,kafka\u receivedtopic=demotopic}]
请帮忙。蒂亚。
1条答案
按热度按时间3qpi33ja1#
你必须设置集装箱工厂的
containerProperties
确认模式为MANUAL
或者MANUAL_IMMEDIATE
得到一个Acknowledgment
对象。对于其他ack模式,容器负责提交偏移量。
或设置
....ackMode
属性(如果使用spring boot)