javascript 如何将Promise.all()限制为每秒5个promise?

rdrgkggo  于 2023-06-04  发布在  Java
关注(0)|答案(8)|浏览(236)

我有几个项目,我需要查询第三方API,并表示API的调用限制为每秒5次调用。我需要以某种方式将对API的调用限制在每秒最多5次调用。
到目前为止,我只是在一个promise数组上使用了Promise.all(),其中每个promise向API发送一个请求,并在API响应HTTP状态码200时进行解析,并在API响应其他状态码时拒绝。但是,当数组中的项目超过5个时,Promise.all()可能会拒绝。
如何将Promise.all()调用限制为每秒5次调用?

j8ag8udp

j8ag8udp1#

使用ES6而不使用库

export async function asyncForEach(array, callback) {
  for (let index = 0; index < array.length; index++) {
    await callback(array[index], index, array);
  }
}
export function split(arr, n) {
  var res = [];
  while (arr.length) {
    res.push(arr.splice(0, n));
  }
  return res;
}
export const delayMS = (t = 200) => {
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(t);
    }, t);
  });
};
export const throttledPromises = (
  asyncFunction,
  items = [],
  batchSize = 1,
  delay = 0
) => {
  return new Promise(async (resolve, reject) => {
    const output = [];
    const batches= split(items, batchSize);
    await asyncForEach(batches, async (batch) => {
      const promises = batch.map(asyncFunction).map(p => p.catch(reject));
      const results = await Promise.all(promises);
      output.push(...results);
      await delayMS(delay);
    });
    resolve(output);
  });
};
vu8f3i0k

vu8f3i0k2#

我希望这对你有帮助。
也就是说,这将使用Promise.all来解决所有请求,如果你有一个很大的查询列表,这将等待所有的解决,并可能导致你的代码中有很多等待来获得所有的响应。如果其中一个请求被拒绝,Promise.all也会拒绝。
我建议,如果你不需要所有的结果放在一起,最好使用其他的东西,比如lodash debounce或throttle或处理这个问题的框架。

let items = [
    {name: 'item1'}, 
    {name: 'item2'}, 
    {name: 'item3'}, 
    {name: 'item4'}, 
    {name: 'item5'}, 
    {name: 'item6'}
];

// This is the api request that you send and return a promise
function apiCall(item) {
  return new Promise((resolve) => {
    setTimeout(() => resolve(item.name), 1000);
  })
}

new Promise((resolve) => {
  let results = [];

  function sendReq (itemsList, iterate, apiCall) {
    setTimeout(() => {
      // slice itemsList to send request according to the api limit
      let slicedArray = itemsList.slice(iterate * 5, (iterate * 5 + 5));
      result = slicedArray.map(item => apiCall(item));
      results = [...results, ...result];

      // This will resolve the promise when reaches to the last iteration
      if (iterate === Math.ceil(items.length / 5) - 1) {
          resolve(results);
      }
    }, (1000 * iterate)); // every 1000ms runs (api limit of one second)
  }

  // This will make iteration to split array (requests) to chunks of five items 
  for (i = 0; i < Math.ceil(items.length / 5); i++) {
    sendReq(items, i, apiCall);
  }
}).then(Promise.all.bind(Promise)).then(console.log);
// Use Promise.all to wait for all requests to resolve
// To use it this way binding is required
pgccezyw

pgccezyw3#

如果你不太担心按顺序解析promise,你可以在bluebird中使用并发选项。
下面将处理5个查询在同一时间只。

const Promise = require('bluebird');

const buildQueries = (count) => {
  let queries = [];

  for(let i = 0; i < count; i++) {
    queries.push({user: i});
  };

  return queries;
};

const apiCall = (item) => {
  return new Promise(async (resolve, reject) => {
    await Promise.delay(1000);
    resolve(item.user);
  });
};

const queries = buildQueries(20);

Promise.map(queries, async query => {
  console.log( await apiCall(query) );
}, {concurrency: 5});
knpiaxh1

knpiaxh14#

也许我头脑简单,但我写了这个版本,只是把传入的数组分成5个promise的块,并在每个块上执行Promise.all()

utility.throttledPromiseAll = async (promises) => {
  const MAX_IN_PROCESS = 5;
  const results = new Array(promises.length);

  async function doBlock(startIndex) {
    // Shallow-copy a block of promises to work on
    const currBlock = promises.slice(startIndex, startIndex + MAX_IN_PROCESS);
    // Await the completion. If any fail, it will throw and that's good.
    const blockResults = await Promise.all(currBlock);
    // Assuming all succeeded, copy the results into the results array
    for (let ix = 0; ix < blockResults.length; ix++) {
      results[ix + startIndex] = blockResults[ix];
    }
  }

  for (let iBlock = 0; iBlock < promises.length; iBlock += MAX_IN_PROCESS) {
    await doBlock(iBlock);
  }
  return results;
};
4bbkushb

4bbkushb5#

我想你可以把你的问题分成两个:同时不超过5个呼叫,并确保最新的呼叫在最早的呼叫之后1秒才发生。
第一部分很容易用一个令人惊叹的p-limit库来解决--它拥有我所见过的最简单的接口。
对于第二部分,您需要实际跟踪每个呼叫何时开始-即。实现等待函数:基本的伪代码,没有测试过:

import pLimit from 'p-limit';
const apiLimit = pLimit(5);

const startTimes = [];

async function rateLimiter(item) {
  const lastSecond = (new Date().getTime()) - 1000;
  if (startTimes.filter(v => v > lastSecond).length >= 5) {
    await new Promise(r => setTimeout(r, 1000));
  }
  // TODO: cleanup startTimes to avoid memory leak
  startTimes.push(new Date().getTime());
  return apiCall(item);
}

await Promise.all(items.map(v => apiLimit(() => rateLimiter(v))))
gudnpqoy

gudnpqoy6#

我们可以使用生成器来发送组中的promise列表。一旦第一个产量解决了,我们就可以做另一个产量。我们将结果存储在数组中。一旦promiseArray长度等于result长度,我们就可以解析 Package 的Promise。

const fetch = require("isomorphic-fetch");
const totalPromiseLength = 5;
const requestMethod = url => () => fetch(url).then(response => response.json());
let promiseArray = [...new Array(totalPromiseLength).keys()].map(index =>
  requestMethod("https://jsonplaceholder.typicode.com/todos/" + (index + 1))
);
function* chunks(arr, limit) {
  for (let i = 0; i < Math.ceil(arr.length / limit); ++i) {
    yield [...arr].slice(i * limit, i * limit + limit);
  }
}

new Promise(async resolve => {
  let generated = chunks(promiseArray, 2);
  let result = [];
  for (let bla of generated) {
    await Promise.all(bla.map(param => param())).then(response => {
      result = [...result, ...response];
      if (result.length === promiseArray.length) {
        resolve(result);
      }
    });
  }
}).then(response => {
  console.log(response);
});
kq0g1dla

kq0g1dla7#

这是阿德尔的答案,但是使用了typescript类型,并且还修复了传递给回调函数的索引和数组:

async function asyncForEach<T>(array: T[], callback: (item: T, index: number, array: T[]) => Promise<void>) {
  for (let index = 0; index < array.length; index++) {
    await callback(array[index] as T, index, array);
  }
}
function split<T>(arr: T[], n: number): T[][] {
  var res = [];
  while (arr.length) {
    res.push(arr.splice(0, n));
  }
  return res;
}
const delayMS = (t = 200) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(t);
    }, t);
  });
};

/**
 * Say you want to call 'parse' on 5 values, but run a maximum of 2 at a time, with 100ms delay between each batch.   Call like:
 *
 * throttlePromises(async (values) => await parse(values), ['1','2','3','4','5'], 2, 100)
 */
export function throttledPromises<T, R>(
  asyncFunction: (item: T, index: number, array: T[]) => Promise<R>,
  items: T[],
  batchSize = 1,
  delay = 0
): Promise<(Awaited<R> | void)[]> {
  return new Promise(async (resolve, reject) => {
    const output: (Awaited<R> | void)[] = [];
    const batches = split(items, batchSize);
    await asyncForEach(batches, async (batch, batchNumber) => {
      const promises = batch
        .map((item, innerIndex) => asyncFunction(item, batchNumber * batchSize + innerIndex, items))
        .map((p) => p.catch(reject));
      const results = await Promise.all(promises);
      output.push(...results);
      if (delay) {
        await delayMS(delay);
      }
    });
    resolve(output);
  });
}
l5tcr1uw

l5tcr1uw8#

我最近将几年来的vanilla ES6 solution I've been using重构为TypeScript,它一直工作得很好。

  • Promise对象根据需要示例化
  • 状态改变时产生结果
  • 依赖原生PromiseSettledResult类型
  • 看起来有点像Array.prototype.reduce()的用法🤷‍♂️
  • throttle.ts*
export enum PromiseState {
  Pending = 'pending',
  Fulfilled = 'fulfilled',
  Rejected = 'rejected',
}

function getPromiseState( promise: Promise<any> ): Promise<PromiseState> {
  const control = Symbol();

  return Promise
    .race([ promise, control ])
    .then( value => ( value === control ) ? PromiseState.Pending : PromiseState.Fulfilled )
    .catch( () => PromiseState.Rejected );
}

export function isFulfilled<T>( promise: PromiseSettledResult<T> ): promise is PromiseFulfilledResult<T> {
  return promise.status === "fulfilled";
}

export function isRejected<T>( promise: PromiseSettledResult<T> ): promise is PromiseRejectedResult {
  return promise.status === "rejected";
}

export async function* throttle<InputType, OutputType>( reservoir: InputType[], promiseFn: ( args: InputType ) => Promise<OutputType>, concurrencyLimit: number ): AsyncGenerator<PromiseSettledResult<OutputType>[], void, PromiseSettledResult<OutputType>[]> {
  let iterable = reservoir.splice( 0, concurrencyLimit ).map( args => promiseFn( args ) );

  while ( iterable.length > 0 ) {
    await Promise.race( iterable );

    const pending: Promise<OutputType>[] = [];
    const resolved: Promise<OutputType>[] = [];

    for ( const currentValue of iterable ) {
      if ( await getPromiseState( currentValue ) === PromiseState.Pending ) {
        pending.push( currentValue );
      } else {
        resolved.push( currentValue );
      }
    }

    iterable = [
      ...pending,
      ...reservoir.splice( 0, concurrencyLimit - pending.length ).map( args => promiseFn( args ) )
    ];

    yield Promise.allSettled( resolved );
  }
}

使用示例:

  • app.ts*
import { throttle, isFulfilled, isRejected } from './throttle';

async function timeout( delay: number ): Promise<string> {
  return new Promise( resolve => {
    setTimeout( () => resolve( `timeout promise with ${ delay } delay resolved` ), delay );
  } );
}

const inputArray: number[] = [ 1200, 1500, 1400, 1300, 1000, 1100, 1200, 1500, 1400, 1300, 1000, 1100 ];

( async () => {

  const timeoutPromises = await throttle<number, string>( inputArray, async item => {
    const result = await timeout( item );

    return `${ result } and ready for for..await..of`;
  }, 5 );

  const messages: string[] = [];

  for await ( const chunk of timeoutPromises ) {
    console.log( chunk.filter( isFulfilled ).map( ({ value }) => value ) );

    console.error( chunk.filter( isRejected ).map( ({ reason }) => reason ) );
  }

})();

相关问题