这是一个后续问题-阅读Kafka的同一信息多次。如果有更好的方式问这个问题而不张贴新的问题,让我知道。在这篇文章中,加里提到
“但如果以后的邮件已经被检索到,您仍将首先看到这些邮件,因此您也必须丢弃这些邮件。”
调用seek()后,有没有一种干净的方法来丢弃poll()已经读取的消息?我通过保存初始偏移量(在recordoffset中)并在成功时增加它来实现这个逻辑。失败时,我调用seek()并将recordoffset的值设置为record.offset()。然后,对于每个新消息,我检查record.offset()是否大于recordoffset。如果是,我只需调用acknowledge(),从而“丢弃”所有先前读取的消息。这是密码-
// in onMessage()...
if (record.offset() > recordOffset){
acknowledgment.acknowledge();
return;
}
try {
processRecord(record);
recordOffset = record.offset()+1;
acknowledgment.acknowledge();
} catch (Exception e) {
recordOffset = record.offset();
consumerSeekCallback.seek(record.topic(), record.partition(), record.offset());
}
这种方法的问题是,多个分区会使它变得复杂。有更简单/更干净的方法吗?
编辑1基于gary的建议,我尝试添加一个类似这样的错误处理程序-
@KafkaListener(topicPartitions =
{@org.springframework.kafka.annotation.TopicPartition(topic = "${kafka.consumer.topic}", partitions = { "1" })},
errorHandler = "SeekToCurrentErrorHandler")
当我得到“cannot resolve method'errorhandler'”时,这个语法有什么问题吗?
Edit2在gary解释了这两个错误处理程序之后,我删除了上面的错误处理程序,并将下面的内容添加到配置文件中-
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProps()));
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
当我启动应用程序,我得到这个错误现在。。。
java.lang.nosuchmethoderror:org.springframework.util.assert.state(zljava/util/function/supplier;)v位于org.springframework.kafka.listener.adapter.messagingmessagelisteneradapter.determineinferredtype(messagingmessagelisteneradapter。java:396)
这是396号线-
Assert.state(!this.isConsumerRecordList || validParametersForBatch,
() -> String.format(stateMessage, "ConsumerRecord"));
Assert.state(!this.isMessageList || validParametersForBatch,
() -> String.format(stateMessage, "Message<?>"));
2条答案
按热度按时间x9ybnkn61#
你走的是正确的道路,是的,你必须处理不同的分区以及。有一个
FilteringMessageListenerAdapter
,但你还是要写逻辑。xpcnnkqh2#
从版本2.0.1开始,如果容器
ErrorHandler
是一个RemainingRecordsErrorHandler
,例如SeekToCurrentErrorHandler
,其余的记录(包括失败的记录)将被发送到错误处理程序而不是侦听器。这允许
SeekToCurrentErrorHandler
重新定位每个分区,以便下次轮询将返回未处理的记录。编辑
有两种类型的错误处理程序。这个
KafkaListenerErrorHandler
(在注解中指定)在侦听器级别工作;它连接到调用@KafkaListener
注解,因此只能访问当前记录。第二个错误处理程序(在侦听器容器上配置)在容器级别工作,因此可以访问其余的记录。这个
SeekToCurrentErrorHandler
是容器级错误处理程序。它是在容器工厂的容器属性上配置的。。。