如何基于start标志启动@kafkalistener

izkcnapc  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(409)

我试着只在旗帜被设置为真的时候才开始我的Kafka主义者。

@Component
public class KafkaTopicConsumer {

//Somehow wrap the listener to only start when a property value is set to true

@KafkaListener(topics = "#{@consumerTopic}", groupId = "#{@groupName}")
public void consumeMessage(ConsumerRecord<String, String> message) throws IOException {
    logger.info("Consumed message from topic: {} with message: {}", message.topic(), message);
}

有没有一种方法可以确保侦听器仅在所说的属性(如start.consumer属性)设置为true时启动?我不希望每次启动应用程序时侦听器都只在我指定要启动它时启动。有没有一个好的方法来处理这个用例?

cwxwcias

cwxwcias1#

首先,你需要设置 autoStartupfalse 给你的容器起个名字。然后您需要根据使用的标志手动启动它 @EventListener .

@Component
public class KafkaTopicConsumer {
    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Value("${start.consumer}")
    private boolean shouldStart;

    @KafkaListener(id = "myListener", autoStartup = "false", topics = "#{@consumerTopic}", groupId = "#{@groupName}")
    public void consumeMessage(ConsumerRecord<String, String> message) throws IOException {
        logger.info("Consumed message from topic: {} with message: {}", message.topic(), message);
    }

    @EventListener
    public void onStarted(ApplicationStartedEvent event) {
        if (shouldStart) {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("myListener");
            listenerContainer.start();    
        }
    }
}

注: @EventListener 如果你使用的话,我会确保集装箱装载正确 @PostConstruct 可能行不通。
编辑:
使用 @Value 注解。
注意:这种方法增加了允许 start 以及 stop 方法也可以动态调用(例如使用jmx),只需做一些更改。这有助于实现这样一种场景:我们希望禁用使用者,然后在不重新启动应用程序的情况下启用它。
另一个很好的方法,正如@makoton的回答中正确指出的,是使用 @ConditionalOnProperty . 请注意,在您的示例中,您可以将其与 @Component 而不是定义 @Bean 手动。

@Component
@ConditionalOnProperty(
        value = "start.consumer",
        havingValue = "true")
public class KafkaTopicConsumer { // ...

一切都取决于你所需要的灵活性。

0aydgbwb

0aydgbwb2#

可以将conditionalsbeans与属性一起使用

@Bean
@ConditionalOnProperty(
  value="my.custom.flag", 
  havingValue = "true")
public KafkaListener kafkaListener{
 .....
}

条件bean允许您基于属性或自定义条件启动bean。参考

相关问题