RabbitMQ,在node中编写的pub sub模型中请求被拒绝的消息

r3i60tvu  于 12个月前  发布在  Node.js
关注(0)|答案(2)|浏览(137)

我正在创建一个简单的聊天应用程序,
1.可以有多个用户和多个聊天室。
1.每个聊天室可以容纳多个用户。

  1. rabbitMQ连接在服务器启动时建立。
    1.当用户连接到服务器时,在rabbitMQ通道中打开套接字连接。
    1.当客户端(用户)通过socket发送消息时,rabbitmq channel使用特定的路由键将其推送到交换机。
    1.在消费时,如果套接字是活动的,那么通过它发送并确认它。如果没有,不要确认它,并重新排队。
    {
//Creating a new connection
amqp.connect('amqp://localhost', (err, conn) => {

conn.createChannel(function(err, ch) {

ch.assertExchange(exchangeName, 'direct', { durable: false });
const routingKey = // some routing key;    

wss.on('connection', (ws, req) => {
  // Creating a new queue for the user
  ch.assertQueue('', { exclusive: true, persist: true, durable: true }, (err, q) => {
    // Binds with the routing key
    ch.bindQueue(q.queue, exchangeName, routingKey);

    ch.consume(q.queue, (msg) => {
      if (ws.readyState === 1) {
        ch.ack(msg);
        ws.send(` [-] Received ${msg.content.toString()}`);
      } else {
        // What will come here so that it will be requeued to a new queue with the same routing id?
      }
    }, { noAck: false });
  });

  ws.on('message', (message) => {
    ch.publish(exchangeName, routingKey, new Buffer(message));
    console.log(`[x] Sent ${message} to the exchange with routingKey ${routingKey}`);
  });
})

当所有用户都在与服务器连接的套接字中时,这将正常工作。我想实现的是,当用户的套接字连接死亡时,如果他错过了基于路由键(特定于用户)的消息,他应该能够在重新连接时再次接收这些消息。我觉得这是可能的,因为如果在任何队列中没有确认,消息可以保留在交换中。但不知道如何实施。

w7t8yxp5

w7t8yxp51#

让我纠正几个误解...
我想实现的是,当用户的套接字连接断开时,如果他错过了基于路由键(特定于用户)的消息,那么无论何时他重新连接到不同的队列,他都应该能够再次接收到这些消息
您正在使用独占队列。根据定义,这些队列在其关联的连接和通道消失时被删除。删除队列时,这些队列中的消息将丢失。
我觉得这是可能的,因为如果在任何队列中没有确认,消息可以保留在交换中。
这不是RabbitMQ的工作方式。交换机用于路由消息,队列用于存储消息。
以下评论不正确:
如果用户1和用户2在不同的主机上,显然他们应该建立不同的通道。“channel”是一个类似于连接的东西,如果两个通道使用相同的队列,它们可以接收相同的消息。
user1user2位于不同的主机上,可能运行在不同的进程中,它们将各自建立自己的连接和通道。两个消费者永远不会从同一个队列接收到相同的消息。
回到最初的问题,您需要将队列声明为not-独占和持久,并以这样的方式命名它们,即当您的应用程序重新连接时,应用程序连接到同一队列。这样,应用程序就可以使用队列中剩余的消息。
有多种方法可以让RabbitMQ删除documented的未使用队列。

m1m5dgzv

m1m5dgzv2#

消费者应该使用Acknowledgement来消费消息,如果消息被传递给消费者并且在消费者连接断开之前没有得到确认,那么RabbitMQ将重新传递它。您可以从RabbitMQ doc获取更多详细信息。
在你的情况下,当套接字连接断开时,你应该强制关闭通道,然后RabbitMQ会知道这些消息应该重新发送。
要确保邮件在服务器重新启动时仍然存在,请将exchange设置为“durable”:ch.assertExchange(exchangeName, 'direct', { durable: false });

相关问题