NodeJS 批量发送API调用

8i9zcol2  于 2023-08-04  发布在  Node.js
关注(0)|答案(5)|浏览(138)

我目前正在尝试模拟50万个IoT设备,以便使用nodejs将有效负载推送到Azure IoT Hub。由于节点本质上是多线程的,它的数据会淹没IoT集线器,我会收到网络错误。
我还尝试了async/await方法,但这需要花费大量时间将数据推送到IoT Hub。
有没有一种方法可以只并行运行100个调用,等待所有调用完成,然后在节点中运行下一个100个调用?
非常感谢!

arknldoa

arknldoa1#

将批处理构建为Promise s的嵌套数组,然后在await s为每个Promise.all解析的循环中对每个批处理使用Promise.all

// This is a mock request function, could be a `request` call 
// or a database query; whatever it is, it MUST return a Promise.
const sendRequest = () => {
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log('request sent')
      resolve()
    }, 1000)
  })
}

// 5 batches * 2 requests = 10 requests.
const batches = Array(5).fill(Array(2).fill(sendRequest))

;(async function() {
  for (const batch of batches) {
    try {
      console.log('-- sending batch --')
      await Promise.all(batch.map(f => f()))  
    } catch(err) {
      console.error(err)
    }
  }
})()

字符串

bybem2ql

bybem2ql2#

如果你使用的是lodash,你可以使用chunk来简化它,chunk会把一个数组划分成指定的最大大小的块
所以在你的情况下你可以这样使用它
变量调用(比如说550个数组)

const batchCalls = _.chunk(calls, 100);

for (const batchCall of batchCalls) {
  await Promise.all(batchCall.map(call => call())) // makes a hundred calls in series
}

字符串

6g8kf2rb

6g8kf2rb3#

您可以很容易地将bluebird Promise's map与并发选项一起使用。在拾取下一批之前,这将处理并发中提到的最大记录数。示例:
第一个月

bbmckpt7

bbmckpt74#

limited-request-queue可用于对请求进行排队。有一些选项可用于设置任何给定时间的最大连接数。下面是我们用来每秒发送5个请求的代码。此外,在任何给定时间内只会发送5个请求。
limited-request-queue

/* 
   Request passed to Targer App (5 requests per seconds) 
   Get the response for each request and passed the response to Source App
   maxSockets: The maximum number of connections allowed at any given time. A value of 0 will prevent anything from going out. A value of Infinity will provide no concurrency limiting.
   maxSocketsPerHost:The maximum number of connections per host allowed at any given time. A value of 0 will prevent anything from going out. A value of Infinity will provide no per-host concurrency limiting.
   rateLimit: The number of milliseconds to wait before each maxSocketsPerHost
   */
var queue1 = new RequestQueue({'maxSockets': 5, 'maxSocketsPerHost': 5, 'rateLimit': 1000}, {
  item: function(input, done) {
      request(input.url, function(error, response) {
        input.res.send(response.body);
        done();
      });
  },
  end: function() {
      console.log("Queue  1 completed!");
  }
});

//To queue request - A for loop could be used to send multiple request
queue1.enqueue({'url': ''});

字符串

kiayqfof

kiayqfof5#

如果我没弄错的话,你可以使用项目的“数组”和Promise.all()方法(或者在你的情况下,.allSettled()只查看每个调用的结果),然后像这样处理其中的每个:

function chunk (items, size) {  
  const chunks = [];
  items = [].concat(...items);

  while (items.length) { chunks.push(items.splice(0, size)); }

  return chunks;
}
async function ProcessDevice(device) {  
    // do your work here
}

// splice your items into chunks of 100, then process each chunk
// catching the result of each ProcessDevice in the chunk.map
// the results of the chunk are passed into the .then( )
// and you have a .catch( ) in case there's an error anywhere in the items
var jobArray = chunk(items,100); 
for (let i = 0; i < jobArray.length; i++) {
    Promise.allSettled(
        jobArray[i].map(ja => ProcessDevice(ja)))
    .then(function(results) { console.log("PromiseResults: " + results); })
    .catch((err) => { console.log("error: " + err); });
}

字符串

相关问题