我使用php-rdkafka作为php-kafka客户端。我成功地用 test
并使用下面的代码使用消息,
$kafkaConsumer = new RdKafka\Consumer();
$kafkaConsumer->addBrokers("127.0.0.1:9292");
$topic = $kafkaConsumer->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$msg = $topic->consume(0, 1000);
if($msg){
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
}
但当我再次尝试设置信息时 test
组并尝试使用的消息 test
组然后我得到旧消息以及新消息。所以我只想知道如何才能确认旧消息,这样我就可以只收到新消息而不是旧消息?有人能帮我擦一下吗?
我的Kafka版本是 0.11.0.1
1条答案
按热度按时间uajslkp61#
在kafka中,确认已消费消息的方法是提交其偏移量。这样,当重新启动消费程序时,它可以检索上一次提交的偏移量,并在停止时重新启动。
正如评论中所建议的,您需要使用
RD_KAFKA_OFFSET_STORED
指示使用者检索存储的偏移量。但是您还需要通过设置
group.id
配置: