使用RabbitMQ,我有一个生产商生产大量(成千上万)的消息。我想让消费者从队列中每次请求拉取1000条消息。我们正在使用Spring AMQP实现来获取消息,但试图弄清楚如何实现每次请求拉取多条消息。预取不t似乎是正确的选择,批处理似乎需要在生产者端完成。
向使用者发出请求以获取要作为一个组处理的消息块时,有哪些选项?
我们的设置示例
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
2条答案
按热度按时间2exbekwf1#
从版本2.2开始,就支持使用者端的批处理
只有
SimpleMessageListenerContainer
支持此功能。https://docs.spring.io/spring-amqp/docs/current/reference/html/#de-batching
从2.2版开始,SimpleMessageListenerContainer可用于在使用者端(生成器发送离散消息的位置)创建批处理。
设置容器属性
consumerBatchEnabled
以启用此功能。请在方法签章中使用
List<String> in
。https://docs.spring.io/spring-amqp/docs/current/reference/html/#receiving-batch
当使用容器工厂的Spring Boot自动配置时,设置
spring.rabbitmq.listener.simple.consumer-batch-enabled:true
(和batch-size
)。您还必须设置
batchListener=true
;引导程序没有将其作为属性提供,您可以添加类似于下面的内容来设置它。svmlkihl2#
在这里,我的建议是您可以将消息生成到多个队列中(例如:5个队列),然后为5个队列启用消费者。将数据划分并生成到5个队列,这样消费者将运行5个消费者线程(对于每个队列),消费速度将快5倍。