librdkafka包含函数 rd_kafka_position
获取给定主题分区的当前偏移量。但评论说:
The \p offset field of each requested partition will be set to the offset
of the last consumed message + 1, or RD_KAFKA_OFFSET_INVALID in case there was
no previous message.
换句话说,如果没有消息被消费,它将不会给你任何有用的信息。
我感兴趣的是我刚刚订阅了一个主题,而且我已经打过电话了 rd_kafka_seek
至:
寻找已知位置(在错误恢复的情况下),或
寻找分区的最末端。
我想知道的是,在这种情况下,下一条消息的偏移量是多少。换句话说,在第一种情况下,它应该是传递给的相同偏移量 rd_kafka_seek
,并且在第二种情况下,它应该是1加上当 rd_kafka_seek
被叫来了。
不幸的是,正如评论所说, rd_kafka_position
不返回此信息。如果还没有消息被消费,它会给出 -1001
( RD_KAFKA_OFFSET_INVALID
). 如果我收到一条信息然后打电话 rd_kafka_position
,它给出了正确的偏移量。
在使用任何消息之前,我是否可以调用其他函数来获取偏移量?
1条答案
按热度按时间ruyhziif1#
我不知道你在找什么……”在大多数情况下,“抵消”是特定于消费者的东西(除了下面我提到的两种情况)。它跟踪每个主题/分区的每个特定使用者的读取进度,如果该使用者还没有进行读取,则该主题/分区还没有特定于使用者的偏移量。因此,在这种情况下要求这个消费者的偏移量没有任何意义-消费者还没有读取任何内容,因此没有与之相关的偏移量,它可以从您希望它开始的任何偏移量开始。
消费者无关补偿有用的两种主要情况是:
根据消息的时间或应用程序中的某个自定义错误日志/报告,知道要从主题中的哪些偏移开始处理时
或者从主题中最早或最新的可用偏移量开始
如果您知道您希望使用者从分区中的哪个位置开始读取消息,那么您只需找到该位置,然后让使用者从此开始读取消息。然后,您可以通过询问在任何时间点的偏移量来跟踪该消费者的进度。。。。
如果你想从最早或最新的位置开始-你可以找到那个位置是什么(使用kafkaadminclient.listfosts(),例如,在2.5.x版本中-也就是在java中,我不知道python中的等效方法是什么),然后再次寻找那个位置并从中开始。
所以,简单地说,你只能期望得到一个正确的抵消消费者,如果它读了任何东西从主题;否则-唯一与消费者无关的有意义的信息将是最早的,最新的或一些特定的(已知)补偿由您确定