node.js异步函数

unftdfkk  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(551)

下面的代码应该通过向某个远程服务器(kafka代理)生成请求消息来处理http请求,并等待它的响应。当响应消息到达时-它应该作为http响应(json或其他)返回。

router.get('/status', function(req, res, next) {
    // init the producer
    ...

    // 1st async function
    producer.on('ready', function () {
        // some code for generating payloads (data for a message)
        ...

        // 2nd async function
        producer.send(payloads, function (err, data) {
            // some log of success sending message 
            ...

            // 3rd async function
            consumer.on('message', function (message) {
                // got some response message
                res.send("message: " + message);
            });
        });
    });
});

我能把这些同步到一起吗?它不是我的?
编辑:我会尽量说得更清楚。考虑以下代码:

function boo() {
    // part 1 - init some consumer
    console.log("1. finish init");

    // part 2 - This is async function. whenever messages will arrive - this function will be fetched.
    consumer.on('message', function (message) {
        console.log("2. message arrive!");
        return message;
    }

    // part 3
    console.log("3. end function");
    return null;
}

假设第二部分发生在1秒之后。输出为:

1. finish init
3. end function
2. message arrive!

而我的目标是等待异步消息(第2部分)并返回它的值。我怎样才能做到这一点?

rn0zuynd

rn0zuynd1#

问这个问题时,我对node.js完全陌生。看了philip roberts的视频后,我意识到javascript实际上是如何工作的。然后,我用一个全球性的 messageArray &a messageId 计数器。每个用户请求都保存在 messageArray (及其相关处理程序对象,以供以后响应)。然后,消息通过Kafka发送到内部系统组件。在消息从系统返回之前,用户不会得到响应)。当一条消息到达kafka消费者(来自系统组件)时,我们提取相关id并回复相关用户)。代码如下:

var messageId = 0;
var messageArray= [];

router.get('/status', function(req, res, next) {
    var o = {id: messageId, req: req, res: res, next: next};
    messageArray.push(o);
    messageId++;

    // send message with kafka producer into the system internal components - THE MESSAGE CONTAINS THE messageId!
});

consumer.on('message', function (message) {
    // Extract the original messageId from the arrived message and look for it in the messageArray
    var messageId = extractMessageId(message);

    var data = dequeueMessageById(messageId);

    // got some response message
    data.res.send("message: " + message);
});

function dequeueMessageById(messageId) {
    for (var i=0 ; i < messageArray.length ; i++) {
        if (messageArray[i].id == messageId) {
            var messageData = messageArray[i];
            messageArray.splice(index, 1); // remove from array
            return messageData;
        }
    } /* for */

    return null;
}
wvmv3b1j

wvmv3b1j2#

你可以用 async 图书馆。

async.series([
 fn1,
 fn2
 ], function (err, results) {    
 console.log(results);
});

或者你可以用https://github.com/andyshin/sequenty

var sequenty = require('sequenty'); 

function f1(cb) // cb: callback by sequenty
{
  console.log("I'm f1");
  cb(); // please call this after finshed
}

function f2(cb)
{
  console.log("I'm f2");
  cb();
}

sequenty.run([f1, f2]);

相关问题