在处理当前消息之前,不要阅读下一条Kafka消息?

oo7oh9g9  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(368)

使用合流kafka c#库消费消息时,我只希望在处理完当前消息后看到一条新消息(如果出现故障,我需要再次读取相同的消息)。换句话说,我不希望偏移量改变,除非我明确告诉它改变。
为了实现这一点,我在配置中禁用了自动提交(就像示例中所示):

{ "enable.auto.commit", false }
{ "auto.offset.reset", "smallest" }

然后我注解掉commit行:

while(true)
{
    if (!consumer.Consume(out Message<string, string> msg, TimeSpan.FromMilliseconds(100)))
    {
        continue;
    }

    //I thought by removing this line, I would keep getting the same message (until I've processed the message and commited the offset)
    //consumer.CommitAsync(msg).Result;
}

我希望通过不提交,我在调用consume()时会得到相同的消息,但事实并非如此。即使我没有提交,偏移量也会不断变化,每次消费我都会收到新消息。
请澄清我明显的误解?

vkc1a9a2

vkc1a9a21#

打电话 CommitAsync 将偏移量保存到Kafka存储的偏移量主题。因此,如果您要处理您的消费者并创建一个新的消费者,您将从上一个提交的偏移重新开始,或者如果您不提交偏移,您将从偏移0开始。
如果不提交偏移量,但继续在应用程序中使用相同的使用者,则偏移量仍将递增,并由使用者保留在内存中。
尽管这是在offsets和consumer position部分下针对java consumer类的文档,但它更详细地说明了offsets和commiting的功能。Kafka消费文件

相关问题