我正在尝试使用rabbitmq和spring boot来知道消息何时被接受(ack)或不接受(nack)。
我想将消息发送到队列(通过exchange),并检查队列是否已接受该消息。实际上,我想发送到两个不同的队列,但这并不重要,我假设它是否对其中一个有效,对另一个也有效。
所以我试过用 CorrelationData
:
public boolean sendMessage(...) {
CorrelationData cd = new CorrelationData();
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, cd);
try {
return cd.getFuture().get(3, TimeUnit.SECONDS).isAck();
} catch (InterruptedException | ExecutionException | TimeoutException e ) {
e.printStackTrace();
return false;
}
}
线路 cd.getFuture().get(3, TimeUnit.SECONDS).isAck()
应该得到 false
is值尚未 ack
我想是排队吧。但这永远是真的,即使 routingKey
不存在。
所以我假设这段代码正在检查消息是否已发送到 exchange
以及 exchange
说“是的,我收到了消息,它还没有被路由,但我已经收到了”。
所以,我在rabbit/spring文档中寻找了其他方法,但我没有找到方法。
再解释一下,我想说的是:
我收到了一条消息。必须将此消息发送到其他队列/exchange,但在其他两个队列确认此消息之前,不能将其从当前队列中删除(即已确认) ack
.
我有手动确认,作为一个小伪代码,我有:
@RabbitListener(queues = {queue})
public void receiveMessageFromDirect(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag){
boolean sendQueue1 = sendMessage(...);
boolean sendQueue2 = sendMessage(...);
if(sendQueue1 && sendQueue2){
//both messages has been readed; now I can ack this message
channel.basicAck(tag, false);
}else{
//nacked; I can't remove the message util both queue ack the message
channel.basicNack(tag,false,true);
}
我测试了这个结构,即使队列不存在,也测试了值 sendQueue1
以及 sendQueue2
都是真的。
1条答案
按热度按时间1sbrub3j1#
确认为真;即使是无法表达的信息(我不完全清楚为什么)。
您需要启用返回的消息(并在
CorrelationData
未来完成后-correlationData.getReturnedMessage()
). 如果不为null,则消息无法路由到任何队列。只有在代理程序中存在错误,或者使用队列时,才能获得nacks
x-max-length
和溢出行为reject-publish
.