我使用spring kafka api实现kafka consumer和手动偏移管理:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
这里,我希望消费者只在 someCondition
持有。否则,消费者应该睡眠一段时间,然后再次阅读相同的消息。
Kafka配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
使用当前配置,如果 someCondition == false
,使用者不提交偏移量,但仍读取下一条消息。有没有办法让消费者重读Kafka acknowledgement
没有表演?
3条答案
按热度按时间pb3s4cty1#
您可以停止并重新启动容器,它将被重新发送。
在即将发布的1.1版本中,您可以寻找所需的偏移量,然后它将被重新发送。
但是如果以后的消息已经被检索到,您仍然会首先看到它们,因此您也必须丢弃它们。
第二个里程碑有这个功能,我们预计它将在下周发布。
vvppvyoh2#
正如@gary已经指出的,你的方向是正确的,
seek()
就是这样做的。今天,当我面对这个问题时,我找不到它的代码示例。这是给任何想解决这个问题的人的代码。dojqjjoe3#
你可以试着用
nack(long sleep)
其中唯一的参数表示sleep interval ms
实现上述行为。来自spring for apache kafka文档:
从版本2.3开始,确认接口有两个附加方法nack(long sleep)和nack(int index,long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发illegalstateexception。
将上述信息应用到代码示例中,我们得到:
配置如下:
该示例生成:
注:
KafkaEventPojo
是我对pojo的实现,它按照我们的内部结构保存存储在kafka中的记录数据,所以您可以根据需要进行任何更改。另外,上面的代码演示了nack在单记录侦听器中的用法。如果您需要批处理选项,您可以在提供的文档中找到如何这样做的示例。