使用RabbitMQ和Camel逐个使用消息

kokeuurv  于 2022-11-07  发布在  Apache
关注(0)|答案(2)|浏览(190)

我有一个RabbitMQ代理和一个使用消息的CamelJava应用程序,使用者处理一个消息大约需要1秒,我产生了10'000个消息;这几乎是即时的。2 RabbitMQ控制台立即报告说队列包含的消息不超过大约7 '000条。3这对我来说是个问题,因为如果消费者失败,大约3' 000条消息将丢失。
我尝试了几种选择:

  • 流缓存:camelContext.setStreamCaching(false);//或true
  • 节流:from(queue).throttle(1)
  • 预取:from(rabbitmq:exchange&queue=q&prefetchEnabled=true&prefetchCount=1&prefetchGlobal=false&prefetchSize=0

我从来没有观察到队列中的消息数量有规律地减少(10'000,然后9' 999,9 '998,等等)。情况恰恰相反:我总是可以看到消息以大块的形式从队列中出来(通常是3,000条消息),尽管使用者实际上处理它们的速度很慢。
如何逐个使用消息?

yebdmbv4

yebdmbv41#

我知道这对您来说可能有点太晚了,但我也一直在与同样的问题作斗争,我终于找到了一个解决方案,使CamelRabbitMq组件逐个使用消息。
您只需要将autoAck属性设定为false
下面我有一个整个路线定义的例子:

rabbitmq:your-exchange?routingKey=rtkey&queue=qname&autoDelete=false&durable=true&guaranteedDeliveries=true&threadPoolSize=1&autoAck=false

那么重要的部分只是:

guaranteedDeliveries=true&threadPoolSize=1&autoAck=false

threadPoolSizeguaranteedDeliveries属性对于实现您想要的内容不是必需的,我将它们放在这里只是为了完整起见。
以下是指向文档的链接

bprjcwpo

bprjcwpo2#

在事务中运行此路由。这样,失败的使用者将以将消息回滚到队列中而结束。

相关问题