spring启动应用程序后的长时间运行方法

7z5jn7bk  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(318)

对于我的springbootweb应用程序,在应用程序启动之后,我想调用一个类的方法来保持它运行直到应用程序关闭。例如,方法的逻辑是使用kafka消息(长轮询)。
所以我最终得到了以下代码。它工作正常,但我正在寻找更简单或优雅的方式来做这件事。

@Component
class KafkaConsumerService : ApplicationRunner {
  private lateinit var kafkaConsumer: KafkaConsumer<String, String>

  init {
    val props = Properties()
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "127.0.0.1:9092"
    props[ConsumerConfig.GROUP_ID_CONFIG] = "AnotherDemoConsumer"
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name

    kafkaConsumer = KafkaConsumer(props)
  }

  override fun run(args: ApplicationArguments?) {
    receiveFromKafka()
  }

  fun receiveFromKafka() {
    kafkaConsumer.subscribe(listOf("test-topic"))

    while (true) {
      val consumerRecords = kafkaConsumer.poll(3000)

      consumerRecords.forEach { record ->
        logger.info("Receive Kafka message having key: ${record.key()}, value: ${record.value()}, " +
            "partition: ${record.partition()}, offset: ${record.offset()}")
      }
    }
  }
}

对于上面的代码,我必须实现applicationrunner接口,然后重写run方法。
是否可以使用其他的spring引导特性而不使用 while 循环或类似调度程序的东西??

laik7k3q

laik7k3q1#

使用 @EnableShceduling 打开spring的调度功能。轮询可以在将由注解的方法中完成 @Scheduled(3000)

相关问题