我正在使用crdkafka中的ckafka实现。问题是关于rdkafka::kafkaconsumer。如何设置使用者从主题的开头开始?p、 链接中的consumer示例基于rdkafka::consumer,标记为“legacy only,use kafkaconsumer”先谢谢
2ledvvac1#
这不是“auto.offset.reset”的工作方式“auto.offset.reset”仅在没有有效提交的偏移量时有效。流程如下:启动使用者(重启或崩溃后)查找偏移量如果找到,从偏移恢复如果找不到,请根据auto.offset.reset设置偏移。如果您想在每次重新启动时阅读整个主题,实际上根本没有理由提交偏移量。提交偏移量的目的是要知道您离开的位置,因为您希望在重新启动后从此偏移量恢复。
olmpazwi2#
我花了好几个小时来处理这个问题,答案是,通常情况下,rtfm:)是的,您需要设置属性的值 auto.offset.reset 至 smallest ,但关键问题是-在哪里?通过此链接:高级kafka使用者(c++中的kafkaconsumer)将在默认情况下从最后一个提交的偏移量开始使用,如果主题+分区和组以前没有提交的偏移量,它将返回主题配置属性auto.offset.reset,该属性默认为latest,因此在分区的末尾开始使用(只使用新消息)。注意粗体字,我做错的是引用这个:
auto.offset.reset
smallest
rd_kafka_conf_set(_conf_handle, key, val, _errstr, sizeof(_errstr));
而不是这样:
rd_kafka_topic_conf_set(_topic_conf_handle, key, val, _errstr, sizeof(_errstr));
2条答案
按热度按时间2ledvvac1#
这不是“auto.offset.reset”的工作方式“auto.offset.reset”仅在没有有效提交的偏移量时有效。流程如下:
启动使用者(重启或崩溃后)
查找偏移量
如果找到,从偏移恢复
如果找不到,请根据auto.offset.reset设置偏移。
如果您想在每次重新启动时阅读整个主题,实际上根本没有理由提交偏移量。提交偏移量的目的是要知道您离开的位置,因为您希望在重新启动后从此偏移量恢复。
olmpazwi2#
我花了好几个小时来处理这个问题,答案是,通常情况下,rtfm:)是的,您需要设置属性的值
auto.offset.reset
至smallest
,但关键问题是-在哪里?通过此链接:
高级kafka使用者(c++中的kafkaconsumer)将在默认情况下从最后一个提交的偏移量开始使用,如果主题+分区和组以前没有提交的偏移量,它将返回主题配置属性auto.offset.reset,该属性默认为latest,因此在分区的末尾开始使用(只使用新消息)。
注意粗体字,我做错的是引用这个:
而不是这样: