我正在使用AMQP-CPP library(不是amqp C库)连接RabbitMQ。使用队列和发布一样有效。我不使用交换或任何东西,只是在一个队列上使用一个消费者。当试图取消队列时,在执行DeferredCancel::onSuccess回调后,我仍然接收消息。此外,回调(std::string consumer)为空,这应该是consumerTag吗?
这是我观察到的:
// publish many messages to "queueName"
m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m){std::cout << m.body()<< std::endl;});
m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){std::cout << "should have stopped consuming for: " << consumer << std::endl});
字符串
产出:
message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until all messages have been delivered
型
我希望在输出“should have stopped consuming”被打印出来之后,消息会停止。
1条答案
按热度按时间0lvr5msh1#
当cancel()请求被发送时,consume请求还没有被处理。RabbitMQ/AMQP-CPP库响应“成功”,即使没有consumer被取消,因为RabbitMQ端还没有consumer存在。然后consume()被处理,这就是为什么我看到前面提到的行为。
我通过将所有内容 Package 在回调中来修复它。我维护自己的DeferredQueue和DeferredConsumer列表,并存储onSuccess回调是否已经执行(因为AMQP-CPP中似乎没有“pending”等效项)。
如果onSuccess回调尚未执行,我会覆盖onSuccess回调,如果它已经执行,我可以正常取消。
字符串