如何在RabbitMQ中使用Nodejs实现远程过程调用(RPC)

zxlwwiss  于 2022-11-23  发布在  RabbitMQ
关注(0)|答案(2)|浏览(235)

因此,我想取一个Json并将其解析为Object,然后实现一个RPC RabbitMQ服务器,这样我就可以通过RabbitMQ将Object发送到服务器,在那里,Object将继续保存在本地Array中,并且将通过RPC从该服务器返回到客户端,该ID将告诉对象存储的确切位置。
官方网站显示了一些RPC在RabbitMQ中的实现,在这里你可以找到他们的实现https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html,在教程中他们发送一个数字,服务器将计算斐波纳契序列,并将结果返回给客户端。相反,我想发送一个对象,而不是数字,我想接收通用的唯一id(uuid)的对象,我将存储在我的程序的一个全局数组中,我改变了代码,使它将发送一个对象,并返回uuid,但它不工作。我将感谢你们的任何帮助

//this is my server_rpc.js code : 
   const amqp = require('amqplib/callback_api');
   const uuid = require("uuid/v1");

  amqp.connect('here is the url: example: localhost', (err, conn) => {

   conn.createChannel( (err, ch) => {

    let q = 'rpc_queue';

    ch.assertQueue(q, {durable: false});

    ch.prefetch(10);

    console.log(' [x] Waiting RPC requests');

    ch.consume(q, function reply(msg) {

        console.log("corralation key is: ", msg.properties.correlationId);
        let n = uuid();

        console.log(" data received ",JSON.parse(JSON.stringify(msg.content.toString())));

        console.log("corralation key is: ", msg.properties.correlationId);

        ch.sendToQueue(msg.properties.replyTo, Buffer.from(n.toString()), {correlationId: msg.properties.correlationId});

        ch.ack(msg);
    });
});

});

// and this is my client_rpc.js code : 
    const amqp = require('amqplib/callback_api');
    const uuid = require("uuid/v1");
     const express = require("express");

let data = {
"name" : "hil01", 
"region" : "weissach",
"ID" : "1",
"version" : "0.0.1"
           } 


      amqp.connect('url: example localhost ', (err, conn) => {

        conn.createChannel( (err, ch) => {

         ch.assertQueue('', {exclusive: true}, (err, q) => {

        var corr = generateUuid();
        var newHil = JSON.stringify(data);

        console.log(" [x] Requesting uuid for the registered HIL: ", newHil );

        console.log("corralation key is: ", corr);

        ch.consume(q.queue, function(msg) {

            if(msg.properties.correlationId == corr) {

                console.log(" [.] Got %s", msg.content.toString());
                setTimeout(() => { conn.close(); process.exit(0) }, 100);
            }
        }, {noAck: true});

        ch.sendToQueue('rpc_queue', Buffer.from(newHil, {correlationId: corr, replyTo: q.queue }));
    });
});

});

//method to generate the uuid, later will be replaced with the real 
   uuid function 
      var generateUuid = () => Math.random().toString() + 
      Math.random().toString() + Math.random().toString()  ;

当我运行server_rpc时,[x]等待的请求应该被打印出来,然后在一个单独的cmd中,我运行client_rpc. js,然后对象应该被发送,服务器执行并将uuid返回给我,返回给客户端。

w3nuxt5m

w3nuxt5m1#

看起来这里需要的是直接回复RPC模式:您希望发送消息并获得响应。
以下是RabbitMQ上有关直接回复的文档:https://www.rabbitmq.com/direct-reply-to.html

左侧;右侧

下面是一个客户端和服务器的示例,它们将在机箱中工作:
https://github.com/Igor-lkm/node-rabbitmq-rpc-direct-reply-to
"里面是什么"
安装RabbitMQ后,您将需要执行2个文件:
server.js

const amqp = require('amqplib');
const uuidv4 = require('uuid/v4');

const RABBITMQ = 'amqp://guest:guest@localhost:5672';

const open = require('amqplib').connect(RABBITMQ);
const q = 'example';

// Consumer
open
  .then(function(conn) {
    console.log(`[ ${new Date()} ] Server started`);
    return conn.createChannel();
  })
  .then(function(ch) {
    return ch.assertQueue(q).then(function(ok) {
      return ch.consume(q, function(msg) {
        console.log(
          `[ ${new Date()} ] Message received: ${JSON.stringify(
            JSON.parse(msg.content.toString('utf8')),
          )}`,
        );
        if (msg !== null) {
          const response = {
            uuid: uuidv4(),
          };

          console.log(
            `[ ${new Date()} ] Message sent: ${JSON.stringify(response)}`,
          );

          ch.sendToQueue(
            msg.properties.replyTo,
            Buffer.from(JSON.stringify(response)),
            {
              correlationId: msg.properties.correlationId,
            },
          );

          ch.ack(msg);
        }
      });
    });
  })
  .catch(console.warn);

client.js

const amqp = require('amqplib');
const EventEmitter = require('events');
const uuid = require('uuid');

const RABBITMQ = 'amqp://guest:guest@localhost:5672';

// pseudo-queue for direct reply-to
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
const q = 'example';

// Credits for Event Emitter goes to https://github.com/squaremo/amqp.node/issues/259

const createClient = rabbitmqconn =>
  amqp
    .connect(rabbitmqconn)
    .then(conn => conn.createChannel())
    .then(channel => {
      channel.responseEmitter = new EventEmitter();
      channel.responseEmitter.setMaxListeners(0);
      channel.consume(
        REPLY_QUEUE,
        msg => {
          channel.responseEmitter.emit(
            msg.properties.correlationId,
            msg.content.toString('utf8'),
          );
        },
        { noAck: true },
      );
      return channel;
    });

const sendRPCMessage = (channel, message, rpcQueue) =>
  new Promise(resolve => {
    const correlationId = uuid.v4();
    channel.responseEmitter.once(correlationId, resolve);
    channel.sendToQueue(rpcQueue, Buffer.from(message), {
      correlationId,
      replyTo: REPLY_QUEUE,
    });
  });

const init = async () => {
  const channel = await createClient(RABBITMQ);
  const message = { uuid: uuid.v4() };

  console.log(`[ ${new Date()} ] Message sent: ${JSON.stringify(message)}`);

  const respone = await sendRPCMessage(channel, JSON.stringify(message), q);

  console.log(`[ ${new Date()} ] Message received: ${respone}`);

  process.exit();
};

try {
  init();
} catch (e) {
  console.log(e);
}

您将得到如下结果:

2g32fytz

2g32fytz2#

实际上,这不是问题答案,只是供您参考的文本
更改
现在2022 amqplib API有一些小的变化,如下所示:

  • channel.responseEmitter.emitchannel.emit
  • channel.responseEmitter.oncechannel.once
  • channel.responseEmitter.setMaxListenerschannel.setMaxListeners

更多详细信息,请参见official api document

备选方案

并且如果您正在寻找单圈rpc(类似于请求和响应,直接grep响应数据)。
我建议使用channel.get,详情请参阅此处

相关问题