限制Node.js中的异步调用

ej83mcc0  于 2022-12-29  发布在  Node.js
关注(0)|答案(5)|浏览(487)

我有一个Node.js应用程序,它可以从本地获取文件列表并将它们上传到服务器,这个列表可能包含数千个文件。

for (var i = 0; i < files.length; i++) {
   upload_file(files[i]);
}

如果我对数千个文件执行这个命令,upload_file将同时被调用数千次,并且很可能会死掉(或者至少会挣扎)。在同步环境中,我们会创建一个线程池,并将其限制在一定数量的线程。有没有简单的方法来限制一次执行多少异步调用?

bt1cpqcv

bt1cpqcv1#

像往常一样,我推荐Caolan McMahon的async module
upload_file函数接受回调作为其第二个参数:

var async = require("async");

function upload_file(file, callback) {
    // Do funky stuff with file
    callback();
}

var queue = async.queue(upload_file, 10); // Run ten simultaneous uploads

queue.drain = function() {
    console.log("All files are uploaded");
};

// Queue your files for upload
queue.push(files);

queue.concurrency = 20; // Increase to twenty simultaneous uploads
guicsvcw

guicsvcw2#

上面的答案,re:NPM上的async是最好的答案,但是如果您想了解有关控制流的更多信息:
你应该研究一下控制流模式。在Chapter 7 of Mixu's Node Book中有一个关于控制流模式的精彩讨论。也就是说,我会看看7.2.3中的例子:* 受限并行-循环的异步、并行、并发受限 *。
我改编了他的例子:

function doUpload() {
    // perform file read & upload here...
}

var files   = [...];
var limit   = 10;       // concurrent read / upload limit
var running = 0;        // number of running async file operations

function uploader() {
    while(running < limit && files.length > 0) {
        var file = files.shift();
        doUpload(file, function() {
            running--;
            if(files.length > 0)
                uploader();
        });
        running++;
    }
}

uploader();
r3i60tvu

r3i60tvu3#

你应该试试排队,我假设upload_file()完成后会触发一个回调,类似这样的东西应该可以完成这个任务(未经测试):

function upload_files(files, maxSimultaneousUploads, callback) {
    var runningUploads = 0,
        startedUploads = 0,
        finishedUploads = 0;

    function next() {
        runningUploads--;
        finishedUploads++;

        if (finishedUploads == files.length) {
            callback();
        } else {
            // Make sure that we are running at the maximum capacity.
            queue();
        }
    }

    function queue() {
        // Run as many uploads as possible while not exceeding the given limit.
        while (startedUploads < files.length && runningUploads < maxSimultaneousUploads) {
            runningUploads++;
            upload_file(files[startedUploads++], next);
        }
    }

    // Start the upload!
    queue();
}
zmeyuzjn

zmeyuzjn4#

其他的答案似乎都过时了,这个问题可以用async中的paralleLimit很容易的解决,下面是使用方法,我还没有测试。

var tasks = files.map(function(f) {
    return function(callback) {
        upload_file(f, callback)
    }
});

parallelLimit(tasks, 10, function(){
});
9avjhtql

9avjhtql5#

    • 没有外部库。只有普通的JS。**

它可以用递归来解决。
其思想是,最初我们立即启动最大允许上载次数,并且每个请求都应该在其完成时递归地启动一个新上载。
在本例中,我将成功的响应与错误一起填充,并执行所有请求,但如果您希望在第一次失败时终止批量上载,则可以稍微修改算法。

async function batchUpload(files, limit) {
  limit = Math.min(files.length, limit);

  return new Promise((resolve, reject) => {
    const responsesOrErrors = new Array(files.length);
    let startedCount = 0;
    let finishedCount = 0;
    let hasErrors = false;

    function recursiveUpload() {
      let index = startedCount++;

      uploadFile(files[index])
        .then(res => {
          responsesOrErrors[index] = res;
        })
        .catch(error => {
          responsesOrErrors[index] = error;
          hasErrors = true;
        })
        .finally(() => {
          finishedCount++;
          if (finishedCount === files.length) {
            hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
          } else if (startedCount < files.length) {
            recursiveUpload();
          }
        });
    }

    for (let i = 0; i < limit; i++) {
      recursiveUpload();
    }
  });
}

async function uploadFile(file) {
  console.log(`${file} started`);
  const delay = Math.floor(Math.random() * 1500);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (delay <= 1000) {
        console.log(`${file} finished successfully`);
        resolve(`${file} success`);
      } else {
        console.log(`${file} finished with error`);
        reject(`${file} error`);
      }
    }, delay);
  });
}

const files = new Array(10).fill('file').map((file, index) => `${file}_${index + 1}`);

batchUpload(files, 3)
  .then(responses => console.log('All successfull', responses))
  .catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));

相关问题