spring kafka在侦听器中按条件丢弃消息

bjg7j2ky  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(421)

在我的spring boot/kafka项目中,我有以下侦听器:

@KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {

    // do some logic

    ack.acknowledge();
}

在监听器内部,我需要根据我的业务逻辑检查一些条件,如果不满足,则跳过对该消息的处理,并让kafka知道重新传递该消息一次。
我之所以需要这样做,是因为根据我应用程序的业务逻辑,我需要避免在特定的电报聊天中每秒发送一个以上的邮件。这就是为什么我想在kafka侦听器中检查chatlaststent时间,并在需要时推迟消息发送(通过消息重新传递到此kafka主题)
如何正确地做到这一点?我只需要不执行 ack.acknowledge(); 这一次还是有另一个更合适的方法来实现它?

4dbbbstv

4dbbbstv1#

您可以使用recordfilterstrategy。
请参见以下文档:https://docs.spring.io/spring-kafka/docs/2.0.5.release/reference/html/_reference.html#_filtering_messages

yzckvree

yzckvree2#

使用 SeekToCurrentErrorHandler .
当抛出异常时,容器将调用错误处理程序,该处理程序将重新查找未处理的消息,以便在下次轮询时再次获取这些消息。

相关问题