rabbitmq 获取单个消息的发布者确认

ldioqlga  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(2)|浏览(171)

我有一个生产者,每当通过API调用接收到消息时,它都会排队,并且我只想在确认消息已被代理接收时返回。
我通过出版商的确认找到了方法-

using (var connection = factory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchangeName, "topic", true, false, null);
                    //This enables producer confirm
                    channel.ConfirmSelect();

                    var properties = channel.CreateBasicProperties();

                    properties.Persistent = true;

                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchangeName, topic, properties, body);

                    channel.WaitForConfirms();                         

                    Console.WriteLine("I sent a message !", message);
                }
            }

我的问题是,我不想等待所有确认,只是那些与此特定的消息。我不想把它限制在一个线程/工作线程上,也不想等待所有的事情都得到确认。
js的rabbit库有一个回调函数,非常适合我的使用--但是C#版本似乎不支持它。

qnakjoqk

qnakjoqk1#

我的问题是,我不想等待所有确认,只是那些与此特定的消息。
您应该订阅BasicAcks回调,并使用它将确认与已发布的消息关联起来。
我不想将此限制为单个线程/工作线程
您可以在线程之间共享该连接,但必须为每个线程创建一个IModel示例。

55ooxyrt

55ooxyrt2#

您需要的是Publisher Confirms和事务。
将您想要确认的消息分离为单个(或子集)交易。
交易:

ch.txSelect(); <-- start transaction
ch.basicPublish("", QUEUE_NAME,
                            MessageProperties.PERSISTENT_BASIC,
                            "nop".getBytes());
ch.txCommit();<--commit transaction

您可以使用流式轻量级发布者确认,使用:

ch.setConfirmListener(new ConfirmListener() {
    public void handleAck(long seqNo, boolean multiple) {

if (multiple) {
    unconfirmedSet.headSet(seqNo+1).clear();
} else {
unconfirmedSet.remove(seqNo);
}
}
    public void handleNack(long seqNo, boolean multiple) {
        // handle the lost messages somehow
    }

我希望它能帮助

相关问题