Reactkafka使用者模板

nbewdwxp  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(294)

我看到了这样一个问题:经过一段时间,随机地,消费者停止消费某个主题的消息,并且并没有抛出任何错误。主题中没有太多消息,因为这是一个测试环境。
该主题的消费者延迟将大于0,并且将被固定在该数字上(就像没有消费者消费它一样)。我使用kafka工具检查了这个主题,我可以看到分区被分配给了一个使用者。
我正在运行多个消费者线程来消费一个主题,下面是代码:

public Runnable getRunnable() {
    return () -> {
      final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = 
          kafkaLib.createKafkaConsumerTemplate();
      kafkaConsumerTemplate.receive()
          .concatMap(record -> {
            // process message and commit offset

          })
          .subscribe();
    };
  }

  @Override
  public void run(ApplicationArguments args) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 2; i++) {
      executorService.execute(getRunnable());
    }

}
我的问题是:如果在一段时间内没有消息推送到主题,那么kafkaconsumpertemplate.receive()是否有可能自己停止/退出/取消订阅?或者在订阅结束后线程会退出?
使用ReactKafka消费者,没有更多 @KafkaListener 所以我使用applicationrunner并生成消费者线程。有没有另一种正确的方法来启动ReactKafka消费者?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题