spring kafka批处理不使用较大的批处理更新偏移量

yqhsw0fo  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(326)

我使用SpringKafka文档中的批处理模式在SpringBoot应用程序中设置了一个kafka消费者。在我们开始讨论这些主题之前,它似乎一直在起作用。然后偏移量没有更新,它会一遍又一遍地处理同一批消费者消息。
默认情况下,基于框架中的某些算法,批大小看起来是动态的。因此,随着主题越来越多,它从成批成百上千条信息变成了成百上千条。我可以通过将批处理大小(max.poll.records)设置为25来解决这个问题,但在我了解根本原因之前,我并不真正信任kafka/spring-kafka。
批次需要3000 ms到15000 ms才能完成。
工厂属性:

public Map<String, Object> getKafkaConfigurationProperties() {
    ImmutableMap.Builder<String, Object> builderMap = new ImmutableMap.Builder<String, Object>()
            .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(',').join(getBootstrapServers()))
            .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer())
            .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100)
            .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
            .put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000)
            .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer())
            .put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);

    String groupId = getConsumerGroup();
    if (StringUtils.isNotBlank(groupId)) {
        builderMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    }

    return builderMap.build();

}

工厂建设:

private KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> makeKafkaListener(
        ConsumerFactory<String, String> consumerFactory, KafkaConsumerConfiguration configuration) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(6);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}

侦听器服务使用注解:

@KafkaListener(
        containerFactory = "serviceListenerContainerFactory",
        group = "kafkaListenerContainers",
        id = "widgetListener",
        topics = "widget.topic.prod")
public void listenProduct(List<ConsumerRecord<String, String>> consumerRecordList) { ... stuff ... }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题