我想从rabbitmq队列中使用风暴喷口中的消息。
现在,我们使用springamqp从rabbitmq异步发送和接收消息。
springamqp提供了从队列中读取消息的机制(创建监听器或使用annotation@rabbitlistner)。
问题是我可以让一个侦听器从队列中读取消息。但我该如何将这个信息发送到我的风暴喷口,它在风暴群上运行?
拓扑将启动一个集群,但是在我的spout的nexttuple()方法中,我需要从这个队列中读取消息。这里能用Springamqp吗?
我已将侦听器配置为从队列中读取消息:
@RabbitListener(queues = "queueName")
public void processMessage(QueueMessage message) {
}
如何将侦听器接收到的上述消息发送到在集群上运行的喷口。
或者,喷口的nexttuple()方法如何将此方法包含在其中?有可能吗
我在这里使用java作为一种语言。
1条答案
按热度按时间idfiyjo81#
您可以使用
RabbitTemplate
receive
或者receiveAndConvert
方法。默认情况下,如果队列中没有消息,它们将返回null。
编辑:
如果你设置
receiveTimeout
(在版本1.5或更高版本中可用),receive方法将阻塞该时间(它在内部使用异步使用者,不轮询)。但是它仍然没有侦听器那么有效,因为每个方法都会创建一个新的使用者;要使用侦听器,您需要在中使用一些内部阻塞机制
nextTuple()
(例如aBlockingQueue
)等待消息。