我有一个RabbitMQ代理和一个使用消息的CamelJava应用程序,使用者处理一个消息大约需要1秒,我产生了10'000个消息;这几乎是即时的。2 RabbitMQ控制台立即报告说队列包含的消息不超过大约7 '000条。3这对我来说是个问题,因为如果消费者失败,大约3' 000条消息将丢失。
我尝试了几种选择:
- 流缓存:
camelContext.setStreamCaching(false);
//或true
- 节流:
from(queue).throttle(1)
- 预取:
from(rabbitmq:exchange&queue=q&prefetchEnabled=true&prefetchCount=1&prefetchGlobal=false&prefetchSize=0
我从来没有观察到队列中的消息数量有规律地减少(10'000,然后9' 999,9 '998,等等)。情况恰恰相反:我总是可以看到消息以大块的形式从队列中出来(通常是3,000条消息),尽管使用者实际上处理它们的速度很慢。
如何逐个使用消息?
2条答案
按热度按时间yebdmbv41#
我知道这对您来说可能有点太晚了,但我也一直在与同样的问题作斗争,我终于找到了一个解决方案,使CamelRabbitMq组件逐个使用消息。
您只需要将
autoAck
属性设定为false
。下面我有一个整个路线定义的例子:
那么重要的部分只是:
threadPoolSize
和guaranteedDeliveries
属性对于实现您想要的内容不是必需的,我将它们放在这里只是为了完整起见。以下是指向文档的链接
bprjcwpo2#
在事务中运行此路由。这样,失败的使用者将以将消息回滚到队列中而结束。