javascript 工作线程间分担负载的最佳方法

yrdbyhpb  于 2023-01-24  发布在  Java
关注(0)|答案(2)|浏览(85)

在工作线程之间共享线性任务以提高性能的最佳方式是什么?
以下面的基本Deno Web服务器为例:

主线程

// Create an array of four worker threads
const workers = new Array<Worker>(4).fill(
    new Worker(new URL("./worker.ts", import.meta.url).href, {
        type: "module",
    })
);

for await (const req of server) {
    // Pass this request to worker a worker thread
}

工人.ts

self.onmessage = async (req) => {
  //Peform some linear task on the request and make a response
};

分配任务的最佳方式是不是与此沿着?

function* generator(): Generator<number> {
    let i = 0;
    while (true) {
        i == 3 ? (i = 0) : i++;
        yield i;
    }
}

const gen = generator();

const workers = new Array<Worker>(4).fill(
    new Worker(new URL("./worker.ts", import.meta.url).href, {
        type: "module",
    })
);

for await (const req of server) {
    // Pass this request to a worker thread
    workers[gen.next().value].postMessage(req);
}

或者有没有更好的方法来做到这一点?比如说,使用Atomics来确定哪些线程可以自由地接受另一个任务。

ikfrs5lh

ikfrs5lh1#

在使用这样的WorkerThread代码时,我发现分发作业的最佳方法是让WorkerThread在知道它已经完成了上一个作业时向主线程请求一个作业,然后主线程可以向它发送一个新作业来响应该消息。
在主线程中,我维护了一个作业队列和一个等待作业的WorkerThread队列。如果作业队列为空,那么WorkerThread队列中可能会有一些等待作业的workerThread。然后,每当一个作业添加到作业队列中时,代码都会检查是否有一个等待的workerThread,如果有,则将其从队列中删除,并发送给它下一个作业。
每当一个workerThread发送一条消息表明它已经准备好下一个作业时,我们就会检查作业队列,如果那里有作业,它就会被删除并发送给那个工作线程,如果没有,这个工作线程就会被添加到WorkerThread队列中。
整个逻辑非常简洁,不需要原子或共享内存(因为所有内容都通过主进程的事件循环进行门控),也没有太多代码。
我在尝试了其他几种方法之后才找到了这种机制,每种方法都有各自的问题。在一种情况下,我有并发问题,在另一种情况下,我让事件循环挨饿,在另一种情况下,我没有对WorkerThreads进行适当的流控制,让它们不堪重负,负载分布不均匀。

wnrlj8wa

wnrlj8wa2#

Deno中有一些抽象可以非常容易地处理这类需求。特别是考虑到pooledMap功能。
您有一个server,它是一个异步迭代器,您希望利用线程来生成响应,因为每个响应都依赖于大量计算所花费的时间,对吗?
很简单。

import { serve } from "https://deno.land/std/http/server.ts";
import { pooledMap } from "https://deno.land/std@0.173.0/async/pool.ts";

const server = serve({ port: 8000 }),
      ress   = pooledMap( window.navigator.hardwareConcurrency - 1
                        , server
                        , req => new Promise(v => v(respondWith(req))
                        );

for await (const res of ress) {
  // respond with res
}

就是这样。在这个特殊的例子中,repondWith函数承担了繁重的计算来准备你的响应对象。如果它已经是一个异步函数,那么你甚至不需要把它 Package 成一个承诺。很明显,这里我刚刚使用了可用的多个少一个线程,但是它取决于你来决定产生多少个线程。

相关问题