rdkafka在kafka中确认消费消息?

7nbnzgx9  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(581)

我使用php-rdkafka作为php-kafka客户端。我成功地用 test 并使用下面的代码使用消息,

$kafkaConsumer = new RdKafka\Consumer();
$kafkaConsumer->addBrokers("127.0.0.1:9292");
$topic = $kafkaConsumer->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
    $msg = $topic->consume(0, 1000);
    if($msg){
    if ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
  }
}

但当我再次尝试设置信息时 test 组并尝试使用的消息 test 组然后我得到旧消息以及新消息。所以我只想知道如何才能确认旧消息,这样我就可以只收到新消息而不是旧消息?有人能帮我擦一下吗?
我的Kafka版本是 0.11.0.1

uajslkp6

uajslkp61#

在kafka中,确认已消费消息的方法是提交其偏移量。这样,当重新启动消费程序时,它可以检索上一次提交的偏移量,并在停止时重新启动。
正如评论中所建议的,您需要使用 RD_KAFKA_OFFSET_STORED 指示使用者检索存储的偏移量。
但是您还需要通过设置 group.id 配置:

<?php

$conf = new RdKafka\Conf();

// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1:9292");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 120*10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}
?>

相关问题