kafka使用spring-kafka-consumer重启组

wgxvkvu9  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(487)

我有一个相当慢的消费者,可能需要超过5分钟来处理记录。我想避免的是Kafka重新稳定这个组织。根据我的理解,为了做到这一点,我必须为Kafka经纪人设置以下属性:

group.max.session.timeout.ms = 3600001 
  group.min.session.timeout.ms = 3600000

在我的应用程序端,我有以下配置:

@Bean
      public Map<String, Object> consumerConfigs() {
        final Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            environment.getProperty("app.kafkaBrokers"));
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) );
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) + 1 );
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
      }

@Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    final ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(9);// was 3
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

我的听众中也有:

@KafkaListener(id = "baz", topics = "tipJobsForExecution", containerFactory="kafkaListenerContainerFactory")
  public void listen(ConsumerRecord<?, ?> record)

我的听众花了大约5分钟来处理这些信息。一结束,我就在Kafka经纪人那里读到了以下内容:
2018-05-03 10:29:11210]信息[groupcoordinator 0]:准备重新平衡groupbaz与老一代22(\uu consumer\u offset-7)(kafka.coordinator.group.groupcoordinator)
据我所知,Kafka认为消费者死了,并重新平衡了这个群体。我的问题是为什么会这样?我的一个想法是,也许心跳不是每3000毫秒心跳,因为它应该,但我不知道这有多麻烦。
提前谢谢,吉安尼斯

xj3cbfub

xj3cbfub1#

您必须了解kafka使用者的三种超时配置参数。
heartbeat.interval.ms—使用kafka的组管理设施时,消费者协调员的预期心跳间隔时间。通常应为会话的1/3。超时值默认值-3000毫秒
session.timeout.ms-如果在此会话超时过期之前代理未接收到心跳,则代理将从组中删除此使用者并启动re-balance.default值10000
max.poll.interval.ms-如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡默认值-300000
在您的情况下,轮询间隔似乎设置为太低的值。
参考-https://kafka.apache.org/documentation/#newconsumerconfigs

相关问题