librdkafka c api kafka使用者未正确读取所有消息

kq0g1dla  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(606)

我正在使用 librdkafka c api使用者(特别是使用 rd_kafka_consumer_poll 我给她打电话 rd_kafka_poll_set_consumer 在此之前)
我看到的问题是,在我的谷歌测试中,我做了以下几点
给Kafka写三封信
初始化/启动kafka消费者( rd_kafka_consumer_poll )
rebalance_cb 我将每个分区的偏移量设置为 RD_KAFKA_OFFSET_STORED 并指派他们处理
在这一点上,我认为它应该读取3条消息,但它只读取最后一条消息,但令人惊讶的是,每个分区的偏移量已经更新了!
我是不是用Kafka消费品遗漏了什么?
还有一个问题是,我最初认为存储的偏移量在kafka代理中,对于topic+consumergroup id+partition组合有唯一的偏移量。
所以我认为不同的消费者群体阅读同一个主题应该有不同的抵消。
然而,情况似乎并非如此。当使用不同的消费群体时,我总是从相同的偏移量来阅读。
我怀疑这可能与offset commit有关,但不确定在哪里解决这个问题。
有什么见解吗?

wsxa1bj1

wsxa1bj11#

要查看的配置: auto.offset.reset 来自kakfa消费者文档:
如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量,该怎么办
来自librdkafka文档:
当偏移存储中没有初始偏移或所需偏移超出范围时要采取的操作:“最小”、“最早”-自动将偏移重置为最小偏移,“最大”、“最晚”-自动将偏移重置为最大偏移,“error”-触发一个错误,该错误通过使用消息并选中“message->err”来检索。类型:枚举值
默认值为latest。
此外,


# define RD_KAFKA_OFFSET_STORED -1000

所以,您试图将分区偏移量设置为-1000,这显然不是有效的偏移量。显然,librdkafka读取了本例中的最后一条消息(我没有检查代码)。

相关问题