我正在使用一个外部服务(Service)来处理一些特定类型的对象。如果我以10个为一批发送对象,那么Service的工作速度会更快。我目前的架构如下。生产者逐个广播对象,一群消费者从队列中(逐个)提取对象并将其发送到Service。这显然不是最优的。
我不想修改生产者代码,因为它可以在不同的情况下使用。我可以修改消费者代码,但代价是增加复杂性。我也知道prefetch_count
选项,但我认为它只在网络级别工作--客户端库(pika)不允许在消费者回调中一次提取多个消息。
那么,**RabbitMQ可以在将消息发送给使用者之前创建一批消息吗?**我正在寻找类似"一次使用 * n * 条消息"的选项。
2条答案
按热度按时间mi7gmzs61#
您不能在使用者回调中对消息进行批处理,但可以使用线程安全库并使用多个线程来使用数据。这样做的好处是,您可以在五个不同的线程上获取五条消息,并在需要时组合数据。
作为一个例子,你可以看看我将如何使用我的AMQP库实现这一点。https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_consumer.py
b5buobof2#
下面的代码将使用
channel.consume
来开始使用消息。当达到所需的消息数量时,我们将退出/停止。我已经设置了一个
batch_size
来防止一次拉取大量的消息。你可以随时改变batch_size
来满足你的需要。