我用Spring for Apache Kafka。我想停止听我的主题,等着逃避OOM。我该怎么做呢?
Spring for Apache Kafka
sqserrrh1#
Kafka提供了从主题暂停()和恢复()消费的选项。您可以使用这些方法,方法是在两者之间实现等待以从内存问题中恢复过来。参考文献:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
u5rb5r592#
您可以简单地使用这些指南来停止听取Kafka消费者的意见。1-kafka-util2-Start/Stop Kafka Consumers using Spring Kafka
eanckbw93#
public static void stopConsumer(final String topic) { ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic); container.stop(); }
yfwxisqw4#
有了Spring Boot和Spring Cloud,就有了一种使用执行器阻止特定消费者的方法。Spring Cloud的Kafka Streams绑定器允许我们启动或停止与其关联的消费者或函数绑定。在Application.Properties中添加management.endpoints.web.exposure.include = bindings(或在Application.yml中添加YAML格式的属性)对http://localhost:9009/actuator/bindings的GET调用将公开所有绑定。对http://localhost:9009/actuator/bindings/{name}的POST调用可以启动或停止绑定。停止消费者的示例curl:curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0启动消费者的示例curl:curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0有关更多详细信息,请访问Spring Cloud Stream Binder文档。
management.endpoints.web.exposure.include = bindings
http://localhost:9009/actuator/bindings
http://localhost:9009/actuator/bindings/{name}
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
4条答案
按热度按时间sqserrrh1#
Kafka提供了从主题暂停()和恢复()消费的选项。您可以使用这些方法,方法是在两者之间实现等待以从内存问题中恢复过来。
参考文献:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
u5rb5r592#
您可以简单地使用这些指南来停止听取Kafka消费者的意见。
1-kafka-util
2-Start/Stop Kafka Consumers using Spring Kafka
eanckbw93#
yfwxisqw4#
有了Spring Boot和Spring Cloud,就有了一种使用执行器阻止特定消费者的方法。
Spring Cloud的Kafka Streams绑定器允许我们启动或停止与其关联的消费者或函数绑定。
在Application.Properties中添加
management.endpoints.web.exposure.include = bindings
(或在Application.yml中添加YAML格式的属性)对
http://localhost:9009/actuator/bindings
的GET调用将公开所有绑定。对
http://localhost:9009/actuator/bindings/{name}
的POST调用可以启动或停止绑定。停止消费者的示例curl:
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
启动消费者的示例curl:
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
有关更多详细信息,请访问Spring Cloud Stream Binder文档。