最近,我开始和Kafka一起学习工作。我正在做的项目使用萨拉玛。
阅读我使用的信息 ConsumerGroup
.
我需要在一段时间后再读一遍这封信如果 foo
退货 false
. 如何做到这一点?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}
1条答案
按热度按时间xdnvmnnf1#
您可以通过将以下内容包含在您的使用者组中,将使用者组的偏移重置为旧的偏移
Setup()
回拨:您也可以通过控制台实现: