如何在Spring的Kafka消费者中停止收听?

brvekthn  于 2022-10-23  发布在  Spring
关注(0)|答案(4)|浏览(250)

我用Spring for Apache Kafka。我想停止听我的主题,等着逃避OOM。我该怎么做呢?

sqserrrh

sqserrrh1#

Kafka提供了从主题暂停()和恢复()消费的选项。您可以使用这些方法,方法是在两者之间实现等待以从内存问题中恢复过来。
参考文献:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

u5rb5r59

u5rb5r592#

您可以简单地使用这些指南来停止听取Kafka消费者的意见。
1-kafka-util
2-Start/Stop Kafka Consumers using Spring Kafka

eanckbw9

eanckbw93#

public static void stopConsumer(final String topic) {    
     ConcurrentMessageListenerContainer<String, String> container 
         = consumersMap.get(topic);
     container.stop();    
 }
yfwxisqw

yfwxisqw4#

有了Spring Boot和Spring Cloud,就有了一种使用执行器阻止特定消费者的方法。
Spring Cloud的Kafka Streams绑定器允许我们启动或停止与其关联的消费者或函数绑定。
在Application.Properties中添加management.endpoints.web.exposure.include = bindings(或在Application.yml中添加YAML格式的属性)
http://localhost:9009/actuator/bindings的GET调用将公开所有绑定。
http://localhost:9009/actuator/bindings/{name}的POST调用可以启动或停止绑定。
停止消费者的示例curl:curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
启动消费者的示例curl:curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
有关更多详细信息,请访问Spring Cloud Stream Binder文档。

相关问题