有没有读到Kafka主题的Spring Schedule的例子?

wkyowqbh  于 2023-03-22  发布在  Apache
关注(0)|答案(3)|浏览(96)

我们尝试在指定的窗口时间读取Kafka的数据(所以我们有Kafka消费者),这意味着避免在其他时间读取数据。但是,我们不知道如何在时间段到期后关闭消费者。我想知道是否有任何例子如何做到这一点?非常感谢提前帮助我们。

zpjtge22

zpjtge221#

您可以禁用autoStartup,然后使用KafkaListenerEndpointRegistrystartstop方法手动启动Kafka containers@KafkaListener Lifecycle Management

public class KafkaConsumer {

 @Autowired
 private KafkaListenerEndpointRegistry registry;

  @KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
  public void listen(...) { ... }

  @Schedule(cron = "")
  public void scheduledMethod() {

   registry.start();

   registry.stop()
  }

但在上述方法中,无法保证所有来自Kafka的消息都将在该时间段内被消耗(这取决于负载和处理速度)

w9apscun

w9apscun2#

我有相同的用例,但我编写了调度程序,为一个批次指定最大轮询记录,并保留一个计数器,如果计数器匹配最大轮询记录,则我认为该批次的处理已经完成,因为它已经处理了在一次轮询期间获得的记录。
然后我取消订阅主题并关闭消费者。下次调度程序运行时,它将再次处理指定的最大轮询记录限制。
fixedDelayString的作用是,一旦前一个完成,调度程序在指定的时间限制后启动。

@EnableScheduling
public class MessageScheduler{

        @Scheduled(initialDelayString = "${fixedInitialDelay.in.milliseconds}", fixedDelayString = "${fixedDelay.in.milliseconds}")
        public void run(){

            /*write your kafka consumer here with manual commit*/

            /*once your batch is finished processing unsubcribe and close the consumer*/
                kafkaConsumer.unsubscribe();
                kafkaConsumer.close();
        }

}
jgovgodb

jgovgodb3#

问题Kafka spring boot listener read messages in time interval被标记为与此重复,但我认为它有不同的回答。
为了更改默认的(0ms)间隔Spring polls messages,您可以在创建containerFactory bean时设置新的idleBetweenPolls

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaEventListenerObjectContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, String> factory = ConcurrentKafkaListenerContainerFactory()
    factory.containerProperties.setIdleBetweenPolls(500L); // new interval in milisecons
        ...
    return factory;
}

相关问题