java—如何知道消息是否已确认/未确认?

wsxa1bj1  于 2021-07-12  发布在  Java
关注(0)|答案(1)|浏览(485)

我正在尝试使用rabbitmq和spring boot来知道消息何时被接受(ack)或不接受(nack)。
我想将消息发送到队列(通过exchange),并检查队列是否已接受该消息。实际上,我想发送到两个不同的队列,但这并不重要,我假设它是否对其中一个有效,对另一个也有效。
所以我试过用 CorrelationData :

public boolean sendMessage(...) {
    CorrelationData cd = new CorrelationData();
    this.rabbitTemplate.convertAndSend(exchange, routingKey, message, cd);
    try {
        return cd.getFuture().get(3, TimeUnit.SECONDS).isAck();
    } catch (InterruptedException | ExecutionException | TimeoutException e ) {
        e.printStackTrace();
        return false;
    }
}

线路 cd.getFuture().get(3, TimeUnit.SECONDS).isAck() 应该得到 false is值尚未 ack 我想是排队吧。但这永远是真的,即使 routingKey 不存在。
所以我假设这段代码正在检查消息是否已发送到 exchange 以及 exchange 说“是的,我收到了消息,它还没有被路由,但我已经收到了”。
所以,我在rabbit/spring文档中寻找了其他方法,但我没有找到方法。
再解释一下,我想说的是:
我收到了一条消息。必须将此消息发送到其他队列/exchange,但在其他两个队列确认此消息之前,不能将其从当前队列中删除(即已确认) ack .
我有手动确认,作为一个小伪代码,我有:

@RabbitListener(queues = {queue})
public void receiveMessageFromDirect(Message message, Channel channel,
                                     @Header(AmqpHeaders.DELIVERY_TAG) long tag){
    boolean sendQueue1 = sendMessage(...);
    boolean sendQueue2 = sendMessage(...);

    if(sendQueue1 && sendQueue2){
        //both messages has been readed; now I can ack this message
        channel.basicAck(tag, false);
    }else{ 
        //nacked; I can't remove the message util both queue ack the message
        channel.basicNack(tag,false,true);
    }

我测试了这个结构,即使队列不存在,也测试了值 sendQueue1 以及 sendQueue2 都是真的。

1sbrub3j

1sbrub3j1#

确认为真;即使是无法表达的信息(我不完全清楚为什么)。
您需要启用返回的消息(并在 CorrelationData 未来完成后- correlationData.getReturnedMessage() ). 如果不为null,则消息无法路由到任何队列。
只有在代理程序中存在错误,或者使用队列时,才能获得nacks x-max-length 和溢出行为 reject-publish .

相关问题