rabbitmq amqp-cpp库:channel->cancel(consumerTag)似乎无法取消

axzmvihb  于 2023-11-19  发布在  RabbitMQ
关注(0)|答案(1)|浏览(170)

我正在使用AMQP-CPP library(不是amqp C库)连接RabbitMQ。使用队列和发布一样有效。我不使用交换或任何东西,只是在一个队列上使用一个消费者。当试图取消队列时,在执行DeferredCancel::onSuccess回调后,我仍然接收消息。此外,回调(std::string consumer)为空,这应该是consumerTag吗?
这是我观察到的:

// publish many messages to "queueName"

m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m){std::cout << m.body()<< std::endl;});

m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){std::cout << "should have stopped consuming for: " << consumer << std::endl});

字符串
产出:

message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until all messages have been delivered


我希望在输出“should have stopped consuming”被打印出来之后,消息会停止。

0lvr5msh

0lvr5msh1#

当cancel()请求被发送时,consume请求还没有被处理。RabbitMQ/AMQP-CPP库响应“成功”,即使没有consumer被取消,因为RabbitMQ端还没有consumer存在。然后consume()被处理,这就是为什么我看到前面提到的行为。
我通过将所有内容 Package 在回调中来修复它。我维护自己的DeferredQueue和DeferredConsumer列表,并存储onSuccess回调是否已经执行(因为AMQP-CPP中似乎没有“pending”等效项)。
如果onSuccess回调尚未执行,我会覆盖onSuccess回调,如果它已经执行,我可以正常取消。

// publish many messages to "queueName"

bool onSuccessExecuted = false;

auto& deferredConsumer = m_channel->consume("queueName", "hardcodedConsumerTag");

deferredConsumer.onReceived([](AMQP::Message m){
      std::cout << m.body()<< std::endl;
    });

deferredConsumer.onSuccess([&](){
      onSuccessExecuted=true;
      // do stuff you want to do when starting consuming a queue
    });

if (onSuccessExecuted == false){
  // this overwrites the previous onSuccess callback
  deferredConsumer.onSuccess([this](){
      cancel();
      // must still be set if we might want to cancel again later
      onSuccessExecuted=true;
    }
} else {
  // if onSuccess has already been executed we just cancel normally,
  // as the onSuccess callback won't be executed again
  cancel();
}

void cancel() {
  m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){
        std::cout << "should have stopped consuming for: " << consumer << std::endl
      });
}

字符串

相关问题