如何阻止消费者

vlurs2pr  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(311)

我有一个springcloudstreams客户机,它从一个由几个分区组成的kakfa主题中读取数据。客户端为它读取的每个kafka消息调用一个webservice。如果在重试几次后webservice不可用,我想阻止消费者从kafka读取内容。参考前面的stackoverflow问题(spring cloud stream kafka pause/resume binders),我自动连接bindingsendpoint并调用changestate()方法来尝试停止使用者,但日志显示,在调用changestate()之后,使用者继续从kafka读取消息。
我使用的是springboot2.1.2.release和springcloud版本greenwich.release。SpringCloudStreamBinderKafka的托管版本是2.1.0.0版本。我已经设置属性autocommitofset=true和autocommitonerror=false。
下面是我的代码片段。有什么我错过了吗?changestate()的第一个输入参数应该是主题名吗?
如果我想在Web服务不可用时退出使用者应用程序,我可以简单地执行system.exit()而不需要首先停止使用者吗?

@Autowired
private BindingsEndpoint bindingsEndpoint;

...
...
@StreamListener(MyInterface.INPUT)  
    public void read(@Payload MyDTO dto,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        try {

            logger.info("Processing message "+dto);
            process(dto); // this is the method that calls the webservice

        } catch (Exception e) {

            if (e instanceof IllegalStateException || e instanceof ConnectException) {

                bindingsEndpoint.changeState("my.topic.name", 
                    BindingsEndpoint.State.STOPPED);    
                // Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object                     
            }

                e.printStackTrace();
                throw (e);
  }
}
moiiocjp

moiiocjp1#

您可以通过使用绑定可视化和控制特性来实现这一点,在这里您可以可视化以及停止/启动/暂停/恢复绑定。
你也知道 System.exit() 会关闭整个jvm吗?

70gysomp

70gysomp2#

同样的问题,changestate()的第一个输入参数应该是绑定名。它对我有用

相关问题