NodeJS js:防止同时调用多个异步函数

li9yvcax  于 2023-06-05  发布在  Node.js
关注(0)|答案(4)|浏览(482)

在单线程、同步、非递归代码中,我们可以确保对于任何给定的函数,一次不会有多个调用。
但是,在async/await世界中,上述内容不再适用:当我们在异步函数f的执行过程中等待某些东西时,它可能会被再次调用。
我突然想到,使用事件发射器和队列,我们可以为异步函数编写一个 Package 器,以保证它一次不会有多个调用。就像这样:

const events = require('events')

function locked(async_fn) {
    const queue = [] // either actively running or waiting to run
    const omega = new events()

    omega.on('foo', () => {
        if (queue.length > 0) {
            queue[0].emit('bar')
        }
    })

    return function(...args) {
        return new Promise((resolve) => {
            const alpha = new events()
            queue.push(alpha)
            alpha.on('bar', async () => {
                resolve(await async_fn(...args))
                queue.shift()
                omega.emit('foo')
            })
            if (queue.length === 1) omega.emit('foo')
        })
    }
}

这个想法是,如果f是一个异步函数,那么locked(f)是一个做同样事情的函数,除了如果在执行f期间调用f,新的调用直到第一次调用返回才开始。
我怀疑我的解决方案有很大的改进空间,所以我想知道:有更好的方法吗?事实上,Node中是否已经内置了一个,或者可以通过npm获得?
编辑以显示如何使用:

async function f() {
    console.log('f starts')
    await new Promise(resolve => setTimeout(resolve, 1000))
    console.log('f ends')
}

const g = locked(f)

for (let i = 0; i < 3; i++) {
    g()
}

运行这个需要3秒钟,我们得到以下输出:

f starts
f ends
f starts
f ends
f starts
f ends

而如果我们在for循环中将g()替换为f(),则执行时间为1秒,并得到以下结果:

f starts
f starts
f starts
f ends
f ends
f ends

(我意识到这是一个相当小的问题,如果它不适合stackoverflow,我道歉,但我不知道有更好的地方。

cld4siwp

cld4siwp1#

如果你遇到这个问题,下面是代码,它完全符合OP的要求:

const disallowConcurrency = (fn) => {
  let inprogressPromise = Promise.resolve()

  return (...args) => {
    inprogressPromise = inprogressPromise.then(() => fn(...args))
    
    return inprogressPromise
  }
}

这样使用:

const someAsyncFunction = async (arg) => {
    await new Promise( res => setTimeout(res, 1000))
    console.log(arg)
}

const syncAsyncFunction = disallowConcurrency(someAsyncFunction)

syncAsyncFunction('I am called 1 second later')
syncAsyncFunction('I am called 2 seconds later')

您可能还希望将函数名更改为更明确的名称,因为promises实际上与并发性无关。

hgc7kmma

hgc7kmma2#

下面是我之前回答的decorator:(Live Demo

function asyncBottleneck(fn, concurrency = 1) {
  const queue = [];
  let pending = 0;
  return async (...args) => {
    if (pending === concurrency) {
      await new Promise((resolve) => queue.push(resolve));
    }

    pending++;

    return fn(...args).then((value) => {
      pending--;
      queue.length && queue.shift()();
      return value;
    });
  };
}

用途:

const task = asyncBottleneck(async () => {
  console.log("task started");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  console.log("end");
});

task();
task();
task();
task();
bhmjp9jg

bhmjp9jg3#

我可以在这里推荐我的模块吗:https://www.npmjs.com/package/job-pipe
如果你有一个async方法:

const foo = async () => {...}

为它创建管道:

const pipe = createPipe({ maxQueueSize: Infinity })

然后你可以像这样 Package 你的方法:

const limitedFoo = pipe(foo)

然后你可以做这样的魔术:

limitedFoo()
limitedFoo()
limitedFoo()
limitedFoo()
await limitedFoo()

尽管我只在等待最后一个,但由于管道限制,这些函数将逐个执行。
job-pipe允许将多个不同的方法组合到一个管道中。它允许配置X数量的并行作业。此外,您还可以随时监视有多少作业正在运行以及有多少作业在排队。如果需要,您也可以选择将它们全部中止。
我知道这是一个很老的帖子,但我希望它能帮助一些人。

muk1a3rh

muk1a3rh4#

所以,这是一种很简单的方法,但是你也可以缓存函数被调用的事实。

let didRun = false;

async function runMeOnce() {
  if (didRun) return;
  didRun = true;

  ... do stuff
}

await runMeOnce():
await runMeOnce(); // will just return;

我相信有更好的解决方案-但这将工作与很少的努力。

相关问题