我尝试使用这里提到的轮询之间的空闲来减慢消耗速率,我还使用max.poll.interval.ms将轮询之间的空闲加倍,但它总是触发分区重新平衡,[编辑]我有5台主机,我将并发级别设置为1 [编辑2]我将轮询之间的空闲时间设置为5 min和max.poll.interval。毫秒到10分钟我还注意到这个日志“即将关闭空闲连接从105由于被闲置为540012米利斯”.我减少了轮询之间的空闲到10秒和问题消失,任何想法为什么?
private ConsumerFactory<String, GenericRecord> dlqConsumerFactory() {
Map<String, Object> configurationProperties = commonConfigs();
DlqConfiguration dlqConfiguration = kafkaProperties.getConsumer().getDlq();
final Integer idleBetweenPollInterval = dlqConfiguration.getIdleBetweenPollInterval()
.orElse(DLQ_POLL_INTERVAL);
final Integer maxPollInterval = idleBetweenPollInterval * 2; // two times the idleBetweenPoll, to prevent re-balancing
logger.info("Setting max poll interval to {} for DLQ", maxPollInterval);
overrideIfRequired(DQL_CONSUMER_CONFIGURATION, configurationProperties, ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
dlqConfiguration.getMaxPollRecords().ifPresent(maxPollRecords ->
overrideIfRequired(DQL_CONSUMER_CONFIGURATION, configurationProperties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
);
return new DefaultKafkaConsumerFactory<>(configurationProperties);
}
2条答案
按热度按时间7z5jn7bk1#
<time to process last polled records> + <idle between polls>
必须小于max.poll.interval.ms
。编辑
容器中有逻辑来确保我们永远不会超过最大轮询间隔:
我无法重现这个问题...
x一个一个一个一个x一个一个二个x
如果您能提供一个这样的小例子来展示您所描述的行为,我将查看一下,看看出了什么问题。
7cwmlq892#
虽然加里Russel的答案是正确的,但我想详细说明一下这个答案。
您可以看到,
max.poll.interval.ms
表示消费者轮询之间可以经过的最大时间量。如果此特定消费者组 B 中的特定消费者 A 被配置为例如每15 s从代理轮询记录,并且在客户端上将max.poll.interval.ms
配置为12 s,那么,一旦12 s过去,代理将决定消费者 A 死亡,因为消费者 A 尚未被管理轮询,并且代理将从消费者组中移除此消费者 A,这当然将触发分区重新平衡。所以这就是为什么
处理上次轮询记录的时间+轮询之间的空闲时间必须小于
max.poll.interval.ms
。是一个有效的语句,因为处理上次轮询记录的时间+轮询之间的空闲时间等于拉取周期,简单地英语它只是意味着- * 您应该在超时之前进行轮询 *
顺便说一下,
max.poll.interval.ms
不是唯一用于指示消费者失败的东西。还有session.timeout.ms设置,与heartbeat.interval. ms一起使用。请注意,错误配置这2个设置也会导致分区的持续重新平衡。这超出了主题,我不会在这里解释。但FYI有一个关于所有这些设置如何生活在一起的very good answer on SO,我强烈建议你看看它。Have a nice day)