@EnableScheduling
public class MessageScheduler{
@Scheduled(initialDelayString = "${fixedInitialDelay.in.milliseconds}", fixedDelayString = "${fixedDelay.in.milliseconds}")
public void run(){
/*write your kafka consumer here with manual commit*/
/*once your batch is finished processing unsubcribe and close the consumer*/
kafkaConsumer.unsubscribe();
kafkaConsumer.close();
}
}
问题Kafka spring boot listener read messages in time interval被标记为与此重复,但我认为它有不同的回答。 为了更改默认的(0ms)间隔Spring polls messages,您可以在创建containerFactory bean时设置新的idleBetweenPolls
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaEventListenerObjectContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = ConcurrentKafkaListenerContainerFactory()
factory.containerProperties.setIdleBetweenPolls(500L); // new interval in milisecons
...
return factory;
}
3条答案
按热度按时间zpjtge221#
您可以禁用
autoStartup
,然后使用KafkaListenerEndpointRegistrystart
和stop
方法手动启动Kafkacontainers
@KafkaListener Lifecycle Management但在上述方法中,无法保证所有来自Kafka的消息都将在该时间段内被消耗(这取决于负载和处理速度)
w9apscun2#
我有相同的用例,但我编写了调度程序,为一个批次指定最大轮询记录,并保留一个计数器,如果计数器匹配最大轮询记录,则我认为该批次的处理已经完成,因为它已经处理了在一次轮询期间获得的记录。
然后我取消订阅主题并关闭消费者。下次调度程序运行时,它将再次处理指定的最大轮询记录限制。
fixedDelayString的作用是,一旦前一个完成,调度程序在指定的时间限制后启动。
jgovgodb3#
问题
Kafka spring boot listener read messages in time interval
被标记为与此重复,但我认为它有不同的回答。为了更改默认的(0ms)间隔Spring polls messages,您可以在创建containerFactory bean时设置新的idleBetweenPolls