rabbitmq 如何使用Spring AMQP在每个请求中使用多个消息?

zysjyyx4  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(312)

使用RabbitMQ,我有一个生产商生产大量(成千上万)的消息。我想让消费者从队列中每次请求拉取1000条消息。我们正在使用Spring AMQP实现来获取消息,但试图弄清楚如何实现每次请求拉取多条消息。预取不t似乎是正确的选择,批处理似乎需要在生产者端完成。
向使用者发出请求以获取要作为一个组处理的消息块时,有哪些选项?
我们的设置示例

@RabbitListener(queues = "queue")
   void listen(String in) {
      log.info(in);
   }
2exbekwf

2exbekwf1#

从版本2.2开始,就支持使用者端的批处理

/**
 * Set to true to present a list of messages based on the {@link #setBatchSize(int)},
 * if the listener supports it. This will coerce {@link #setDeBatchingEnabled(boolean)
 * deBatchingEnabled} to true as well.
 * @param consumerBatchEnabled true to create message batches in the container.
 * @since 2.2
 * @see #setBatchSize(int)
 */
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
    this.consumerBatchEnabled = consumerBatchEnabled;
}

只有SimpleMessageListenerContainer支持此功能。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#de-batching
从2.2版开始,SimpleMessageListenerContainer可用于在使用者端(生成器发送离散消息的位置)创建批处理。
设置容器属性consumerBatchEnabled以启用此功能。
请在方法签章中使用List<String> in
https://docs.spring.io/spring-amqp/docs/current/reference/html/#receiving-batch

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

当使用容器工厂的Spring Boot自动配置时,设置spring.rabbitmq.listener.simple.consumer-batch-enabled:true(和batch-size)。
您还必须设置batchListener=true;引导程序没有将其作为属性提供,您可以添加类似于下面的内容来设置它。

@Component
class FactoryCustomizer {

    FactoryCustomizer(SimpleRabbitListenerContainerFactory factory) {
        factory.setBatchListener(true);
    }

}
svmlkihl

svmlkihl2#

在这里,我的建议是您可以将消息生成到多个队列中(例如:5个队列),然后为5个队列启用消费者。将数据划分并生成到5个队列,这样消费者将运行5个消费者线程(对于每个队列),消费速度将快5倍。

相关问题