下面是我的消费者和生产者脚本:消费者脚本
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('localhost:9092');
$config->setGroupId('test');
$config->setBrokerVersion('1.1.1');
$config->setTopics(['Request']);
$config->setOffsetReset('earliest');
$consumer = new Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
制片人脚本
$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('localhost:9092');
$config->setBrokerVersion('1.1.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
function() {
return [
[
'topic' => 'Request',
'value' => 'My Test Data',
],
];
}
);
$producer->success(function($result) {
var_dump($result);
});
$producer->error(function($errorCode) {
var_dump($errorCode);
});
$producer->send(true);
它工作得很好。当使用者正在运行并且生产者正在发布数据时,使用者能够读取数据。但一旦我停止消费者,然后发布数据,然后启动消费者,消费者就不会读取这些数据。我做错什么了?有人能帮忙吗?
1条答案
按热度按时间sauutmhj1#
如果你设置
auto.offset.reset
至earliest
这并不意味着消费者总是从一开始就阅读事件。当没有提交的偏移量时,该设置用于定义使用者行为。或者,可以将其配置为latest
或者none
.enable.auto.commit
默认为true
这意味着kafka消费者将定期提交其当前偏移量。此外,您还配置了一个消费者组id,这意味着您的应用程序在重新启动后将在完成的地方继续消费。
如果您总是想从一开始就处理消息,您可以设置
enable.auto.commit=false
或删除组id。请参阅消费者配置
影响偏移管理的两个主要设置是是否启用自动提交和偏移重置策略。首先,如果设置enable.auto.commit(这是默认值),那么使用者将按照auto.commit.interval.ms设置的间隔周期性地自动提交偏移量。默认值为5秒。
其次,使用auto.offset.reset来定义当没有提交位置(在组首次初始化时就是这种情况)或偏移量超出范围时使用者的行为。您可以选择将位置重置为“最早”偏移或“最新”偏移(默认值)。如果您希望自己设置初始偏移,并且愿意手动处理超出范围的错误,也可以选择“无”。