使用合流kafka c#库消费消息时,我只希望在处理完当前消息后看到一条新消息(如果出现故障,我需要再次读取相同的消息)。换句话说,我不希望偏移量改变,除非我明确告诉它改变。
为了实现这一点,我在配置中禁用了自动提交(就像示例中所示):
{ "enable.auto.commit", false }
{ "auto.offset.reset", "smallest" }
然后我注解掉commit行:
while(true)
{
if (!consumer.Consume(out Message<string, string> msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}
//I thought by removing this line, I would keep getting the same message (until I've processed the message and commited the offset)
//consumer.CommitAsync(msg).Result;
}
我希望通过不提交,我在调用consume()时会得到相同的消息,但事实并非如此。即使我没有提交,偏移量也会不断变化,每次消费我都会收到新消息。
请澄清我明显的误解?
1条答案
按热度按时间vkc1a9a21#
打电话
CommitAsync
将偏移量保存到Kafka存储的偏移量主题。因此,如果您要处理您的消费者并创建一个新的消费者,您将从上一个提交的偏移重新开始,或者如果您不提交偏移,您将从偏移0开始。如果不提交偏移量,但继续在应用程序中使用相同的使用者,则偏移量仍将递增,并由使用者保留在内存中。
尽管这是在offsets和consumer position部分下针对java consumer类的文档,但它更详细地说明了offsets和commiting的功能。Kafka消费文件