SpringKafka侦听器:在使用手动确认的最后一条消息之后仍保留偏移量

k10s72fa  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(312)

在我当前的项目中,我使用springkafka来发送、转换和检索消息。生产者将类型a的消息发送到kafka主题topic-a,然后kafka streams示例从该主题读取类型a的消息,将其Map到类型b,并将这些Map的消息写入kafka主题topic-b。最后,kafka侦听器侦听topic-b并将这些消息持久化到elasticsearch示例。
侦听器在批处理模式下工作,并配置为手动确认。在包含的消息成功持久化到elasticsearch后,批处理将得到确认。
这非常有效,除了一个边缘情况:在生产者完成发送消息(这是一个测试设置)并且侦听器检索、持久化并确认了最后一批消息之后,消费者组中的消费者仍然存在延迟:

+-----------------------------------------------------------------------------------------------------------------------+
| group               topic       partition    current-offset  log-end-offset  lag   consumer-id  host        client-id |
+-----------------------------------------------------------------------------------------------------------------------+
| consumer-group1     topic-B     2            335681           335690           9     consumer2  /172.18.0.1   client2 |
| consumer-group1     topic-B     9                 0                0           0     consumer9  /172.18.0.1   client9 |
| consumer-group1     topic-B     6            537082           537085           3     consumer6  /172.18.0.1   client6 |
| consumer-group1     topic-B     8            394747           394752           5     consumer8  /172.18.0.1   client8 |
| consumer-group1     topic-B     7            584215           584225          10     consumer7  /172.18.0.1   client7 |
| consumer-group1     topic-B     1                 0                0           0    consumer10  /172.18.0.1  client10 |
| consumer-group1     topic-B     5                 0                0           0     consumer5  /172.18.0.1   client5 |
| consumer-group1     topic-B     0           1089342          1089350           8     consumer1  /172.18.0.1   client1 |
| consumer-group1     topic-B     4                 0                0           0     consumer4  /172.18.0.1   client4 |
| consumer-group1     topic-B     3                 0                0           0     consumer3  /172.18.0.1   client3 |
+-----------------------------------------------------------------------------------------------------------------------+

我用ack mode manual\u immediate和manual测试了这一点,结果是一样的,在侦听器检索到最后一批之后,消费者延迟永远不会减少。
在我的测试中,我也用一个香草Kafka消费者做了同样的尝试。通过这种方法,在确认最后一批之后,消费者滞后是预期的(=0)。
这是非常混乱的,因为我对这两种方法使用相同的Kafka配置,所以我怀疑这与springkafka有关。有没有人遇到过同样的问题,如果有,你是如何解决的?非常感谢您的帮助!

配置

spring:
  kafka:
    listener:
      ack-mode: MANUAL # MANUAL_IMMEDIATE
      concurrency:  10
    consumer:
      client-id: "b-consumer-client-local"
      group-id: "b-consumer-group-local"
      bootstrap-servers: "localhost:9092"
      # key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
      # value-deserializer: "org.myproject.serialization.BDeserializer"
      # auto-commit-interval: 1000
      # auto-offset-reset: "earliest"
      enable-auto-commit: false
      # fetch-max-wait: 500
      # fetch-min-size: 1
      # heartbeat-interval: 3000
      # max-poll-records: 5000

侦听器配置

@Configuration
public class ListenerConfiguration {
    private final KafkaProperties kafkaProperties;

    @Autowired
    public ListenerConfiguration(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ConsumerFactory<String, B> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                                                new StringDeserializer(),
                                                new BDeserializer());
    }

    public Map<String, Object> consumerConfigs() {
        return new HashMap<>(kafkaProperties.buildConsumerProperties());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, B>
                kafkaListenerContainerFactory(ConsumerFactory<String, B> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, B> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
        factory.setBatchListener(true);
        return factory;
    }
}

监听器

@Component
public class TopicListener implements BatchAcknowledgingMessageListener<String, B> {
    private final String topic;

    TopicListener(TopicsConfiguration topicsConfiguration) {
        this.topic = topicsConfiguration.getInput();
    }

    @Override
    @KafkaListener(id = "batch-listener", topics = "#{__listener.getTopic()}")
    public void onMessage(List<ConsumerRecord<String, B>> list, Acknowledgment acknowledgment) {
        if(saveData(list)) {
            acknowledgment.acknowledge();
        } 
    }

    private boolean saveData(List<ConsumerRecord<String, B>> list) {
        ...
    } 
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题