kafka分区分配协议

bakd9h0s  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(397)

我正在编写集成测试,以使用confluent dotnet( Package librdkafka)验证kafka生产者-消费者配置。
在一个测试中,我想启动一个使用者,该使用者将从现有分区的末尾开始,然后发布来自生产者的消息,并Assert该使用者只使用了一条消息。
现在,使用者的启动是异步的(即:如果您调用subscribe then publish right away,那么从末尾开始的使用者将不会收到它)。在没有竞争条件的情况下,编写这个测试的合适方法是什么?一旦我完成了“partition.assign”,消费偏移量是否已经确定?我不确定,因为onpartitionassigned的回调只包含一个topicpartition,没有偏移量。
在一个相关的问题上,似乎有时,在没有任何kafka节点故障的情况下(afaict),分配给我的分区比分配给我的分区多(即:分配给同一个分区两次),这怎么可能呢?

ulydmbyx

ulydmbyx1#

设置onpartitioneof委托,当使用者到达分区的末尾时将调用该委托,当调用它时,您可以确保使用者确实正在获取给定分区的消息,并且您可以开始向它生成消息。

consumer.OnPartitionEOF += (_, end)
            => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

相关问题