正如标题所述,我在重新实现Node.js提供的MessageChannel时遇到了一个非常奇怪的行为。
我的实现目标是为向另一个线程发送UTF-16字符串提供更好的性能。
快速免责声明:这不意味着生产使用(至少现在)。我只是在试验一个算法/概念验证,并希望自己实现它。因此,我不会接受“你为什么要重新发明轮子”或“已经有这个npm包完全适合你的用例”这样的答案。
算法
我的想法是有两个手柄:一个用于写入,由主线程管理和使用,另一个用于阅读,交给一个Worker。
它们使用两个类型化数组通过SharedArrayBuffer进行通信:X1 E2 F1 X,其跨越整个缓冲器-8个字节;以及X1 E3 F1 X,其跨越最后8个字节,并用于分别存储写入者和读取者索引以提供同步机制。
写入句柄通过将字符串推入数组来将其入队。setImmediate触发后,它通过将字符串的长度写入共享缓冲区中的当前写入索引来刷新字符串,然后一次一个代码点地encoding字符串本身。对每个字符串重复此过程,直到队列最终被清除,同时在必要时还环绕缓冲区的结尾。
这意味着如果将“Hello”和“world!”写成两个独立的字符串,在刷新操作结束时,缓冲区将如下所示:[5、72、101、108、108、111、7、32、119、111、114、108、100、33、......、14、0]其中第一元件(5)是“Hello”的长度,后面的5个数字是UTF-16编码的字符,下面的元素(7)是“world!"的长度,后面的7个数字也是编码后的字符串,最后两个数字(14和0)分别是写入器和读取器索引。在此刷新之后,下一个写入操作将从缓冲区的第15个元素开始发生。
阅读句柄只是一个生成器,它使waits成为写入者索引中的notified变化,这发生在队列被刷新之后。阅读句柄读取位于当前读索引处的值作为要产生的下一段串的长度,然后decodes并产生原始队列的第一串。这个过程一直进行,直到阅读句柄赶上写句柄,这时它进入睡眠状态,等待通知再次开始读操作。
实现
import {nextTick} from "node:process"
import {isMainThread, Worker, workerData, type WorkerOptions as _WO} from "node:worker_threads"
/**
* Key used to access the {@link SharedArrayBuffer} in worker data.
*/
const kSharedBuffer = "__$SharedBuffer$" as const
export class WorkerStream {
////////////////////////////////////////////////////////////////////////////
// region Buffer
////////////////////////////////////////////////////////////////////////////
/**
* The writable section of the {@link SharedArrayBuffer}.
*/
readonly #buffer: Uint16Array
/**
* The section of the {@link SharedArrayBuffer} used to store the atomic W/R indexes
* of the {@link WorkerStream} and the {@link WorkerStreamHandle} respectively.
*/
readonly #stateBuffer: Int32Array
/**
* Wraps an index around the buffer.
*
* @throws {RangeError} If the index is negative.
* @throws {RangeError} If the index needs to be wrapped around more than once.
*/
private wrapIndex(index: number): number {
if (index < 0) {
throw new RangeError("Index cannot be negative")
}
const {length} = this.#buffer
if (index >= length * 2) {
throw new RangeError("Index cannot be wrapped around more than once")
}
return (index >= length) ? index - length : index
}
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// region Data Queue
////////////////////////////////////////////////////////////////////////////
/**
* The maximum length of a single string in UTF-16 code units.
*/
static readonly MAX_STRING_LENGTH = 10_000
/**
* The queue of strings to be written to the {@link WorkerStreamHandle}.
*/
readonly #dataQueue: string[] = []
/**
* The number of cells in the buffer required to flush the data queue.
*/
#dataQueueSize = 0
/**
* Flushes the queue of strings to the {@link buffer}.
*/
flush() {
const {length: bufferLength} = this.#buffer
let [w] = this.#stateBuffer
while (this.#dataQueue.length > 0) {
const s = this.#dataQueue.shift()!
this.#buffer[w] = s.length
for (let i = 0; i < s.length; i++) {
if (++w === bufferLength) {
w = 0
}
this.#buffer[w] = s.charCodeAt(i)
}
if (++w === bufferLength) {
w = 0
}
}
this.#dataQueueSize = 0
Atomics.store(this.#stateBuffer, 0, w)
Atomics.notify(this.#stateBuffer, 0)
}
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
/**
* @throws {RangeError} If the buffer size is less than 2 * {@link Int32Array.BYTES_PER_ELEMENT}.
*/
constructor(
workerPath: URL | string,
workerOptions: Omit<_WO, "workerData"> & { workerData?: Record<string | symbol, any> } = {},
unrefWorker = true,
bufferSize: number = 4 * 1024 * 1024,
) {
if (bufferSize < 2 * Int32Array.BYTES_PER_ELEMENT) {
throw new RangeError("Buffer size must be at least 2 * Int32Array.BYTES_PER_ELEMENT")
}
const buffer = new SharedArrayBuffer(bufferSize)
this.#buffer = new Uint16Array(
buffer,
0,
(bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT) / Uint16Array.BYTES_PER_ELEMENT,
)
this.#stateBuffer = new Int32Array(buffer, bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT)
workerOptions.workerData ??= {}
workerOptions.workerData[kSharedBuffer] = buffer
const worker = new Worker(workerPath, workerOptions)
worker.once("exit", () => {
throw new Error("Worker exited unexpectedly")
})
worker.once("error", (err) => {
throw err
})
if (unrefWorker) {
worker.unref()
}
}
////////////////////////////////////////////////////////////////////////////
// region Streaming
////////////////////////////////////////////////////////////////////////////
/**
* Writes a string to the {@link WorkerStream}.
*
* # Summary
*
* This method enqueues a string to be streamed to the {@link WorkerStreamHandle}
* as soon as {@link setImmediate} allows it.
*
* The string is encoded using UTF-16, so it can contain any Unicode character,
* and its length is limited to {@link MAX_STRING_LENGTH}.
*
* If there is not enough space in the buffer to write the string,
* calling this method will return `false` and the string will not be enqueued.
* You are advised to check with {@link canWrite} before calling this method.
*/
write(s: string): boolean {
if (!this.canWrite(s)) {
return false
}
this.#dataQueue.push(s)
if (this.#dataQueueSize === 0) {
this.#dataQueueSize++
setImmediate(() => this.flush())
}
this.#dataQueueSize += s.length + 1
return true
}
/**
* Determines whether there is enough space in the buffer to write a string.
*/
canWrite({length}: string): boolean {
if (length > WorkerStream.MAX_STRING_LENGTH) {
return false
}
const w = this.#stateBuffer[0], r = Atomics.load(this.#stateBuffer, 1)
const end = this.wrapIndex(w + length + 1)
const didNotWrapAroundBuffer = end > w
const endsBehindReader = end < r
const wasAlreadyBehindReader = w < r
if (wasAlreadyBehindReader) {
return endsBehindReader && didNotWrapAroundBuffer
} else {
return endsBehindReader || didNotWrapAroundBuffer
}
}
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
}
export class WorkerStreamHandle {
////////////////////////////////////////////////////////////////////////////
// region Buffer
////////////////////////////////////////////////////////////////////////////
/**
* The readable section of the {@link SharedArrayBuffer}.
*/
readonly #buffer: Uint16Array
/**
* The portion of the {@link SharedArrayBuffer} used to store the atomic W/R indexes
* of the {@link WorkerStream} and the {@link WorkerStreamHandle} respectively.
*/
readonly #stateBuffer: Int32Array
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
/**
* @throws {Error} If instantiated from the main thread.
* @throws {Error} If the {@link SharedArrayBuffer} is not found in {@link workerData}.
*/
constructor() {
if (isMainThread) {
throw new Error("WorkerStreamHandle can only be used inside a worker")
}
const buffer = workerData[kSharedBuffer]
if (!(buffer instanceof SharedArrayBuffer)) {
throw new Error("SharedBuffer not found in workerData; did you instantiate WorkerStreamHandle from the right worker?")
}
delete workerData[kSharedBuffer]
const {byteLength: bufferSize} = buffer
this.#buffer = new Uint16Array(
buffer,
0,
(bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT) / Uint16Array.BYTES_PER_ELEMENT,
)
this.#stateBuffer = new Int32Array(buffer, bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT)
}
////////////////////////////////////////////////////////////////////////////
// region Streaming
////////////////////////////////////////////////////////////////////////////
#isStreaming = false
async* [Symbol.asyncIterator](): AsyncIterableIterator<string> {
if (this.#isStreaming) {
throw new Error("Already streaming data")
}
this.#isStreaming = true
const {length: bufferLength} = this.#buffer
while (true) {
let r = this.#stateBuffer[1]
await new Promise<void>((resolve) => {
nextTick(() => {
Atomics.wait(this.#stateBuffer, 0, r)
resolve()
})
})
const length = this.#buffer[r]
const buffer = new Uint16Array(length)
for (let i = 0; i < length; i++) {
if (++r >= bufferLength) {
r = 0
}
buffer[i] = this.#buffer[r]
}
this.#stateBuffer[1] = (++r >= bufferLength) ? 0 : r
yield String.fromCharCode(...buffer)
}
}
static async* incoming(): AsyncIterableIterator<string> {
yield* new WorkerStreamHandle()
}
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
}
- PS:我使用私有字段来使缓冲区无法从外部访问,以防止对底层缓冲区的恶意交互 *
问题
这个实现直到昨天都运行良好:为了确保底层工作者接收文本流,我编写了以下测试:
logger.test.ts
import {equal, ok, throws} from "node:assert/strict"
import {execSync} from "node:child_process"
import {MessageChannel, Worker} from "node:worker_threads"
import {WorkerStream} from "../src/logger/stream"
const testingWorkerPath = new URL("./data/worker.js", import.meta.url)
describe("WorkerStream", () => {
before(() => {
// Suppress the "Worker exited unexpectedly" error
// so that we can manually close Workers to stop tests
// without having Mocha crying about uncaught exceptions
const original = process.listeners("uncaughtException").pop()!
process.removeListener("uncaughtException", original)
process.on("uncaughtException", (err) => {
if (err.message !== "Worker exited unexpectedly") {
original(err, "uncaughtException")
}
})
})
it("writes data to the worker", async () => {
const {port1: workerPort, port2: parentPort} = new MessageChannel()
const stream = new WorkerStream(
testingWorkerPath,
{workerData: {parentPort}, transferList: [parentPort]},
)
stream.write("Hello World!")
const receivedMessage = await new Promise((resolve) => {
workerPort.on("message", resolve)
})
equal(receivedMessage, "Hello World!")
stream.write("done")
})
it("wraps around when it reaches the end of the buffer", async () => {
const DATA = [
"ab".repeat(15),
"bc".repeat(25),
"cd".repeat(35),
"de".repeat(35),
"ef".repeat(35),
"fg".repeat(35),
]
const {port1: workerPort, port2: parentPort} = new MessageChannel()
const stream = new WorkerStream(
testingWorkerPath,
{workerData: {parentPort}, transferList: [parentPort]},
true,
1024,
)
for (const data of DATA) {
const iterations = Math.floor(1024 / (data.length * 2) - 1)
for (let _ = 0; _ < 15; _++) {
for (let i = 0; i < iterations; i++) {
ok(stream.write(data))
}
for (let i = 0; i < iterations; i++) {
equal(
await new Promise((resolve) => {
workerPort.on("message", resolve)
}),
data,
)
}
}
}
stream.write("done")
})
})
data/worker.js
import {isMainThread, workerData} from "node:worker_threads"
import {WorkerStreamHandle} from "../../src/logger/stream.js"
if (isMainThread) {
throw new Error("This file must be run as a worker")
}
/**
* @type {import("node:worker_threads").MessagePort}
*/
const parentPort = workerData.parentPort
/**
* Incoming stream of text.
*/
for await (const chunk of WorkerStreamHandle.incoming()) {
if (chunk === "done") {
break
}
parentPort.postMessage(chunk)
}
上面所有的测试都像魔咒一样通过了。每个进入工作者的消息总是被正确地报告。没有数据损坏,没有数据竞争,甚至在绕着缓冲区时也没有任何问题。
当我尝试在更“现实”的环境中使用WorkerStream时,问题出现了,如下所示:
import {isMainThread} from "node:worker_threads"
import {WorkerStreamHandle} from "./logger/stream"
if (isMainThread) {
throw new Error("This file must be run as a worker")
}
for await (const chunk of WorkerStreamHandle.incoming()) {
console.log(chunk)
}
当我试图通过发送2条消息来运行这个worker时,似乎只有1条消息到达了另一端,简而言之,问题是在除了测试之外的任何设置中,只有第一个字符串到达了通道的另一端;所有其它弦似乎“消失”。
我尝试过的事情
- 最初,我认为工作者可能需要更多的时间来接收和记录第二条消息,但即使通过promises/timer/socket/其他流重新启动工作者或人工保持主线程活动,也没有任何React。
- 然后,我尝试使读取器的发电机同步,没有任何变化
- 然后是添加一些控制台日志的时候了,看看我是否可以快速识别罪魁祸首,这就是奇怪的部分:并非所有调试消息都被记录,即使是那些实际上一个接一个、中间没有任何代码的消息
- 我最终决定打开调试器和分析器,结果却看到打开调试器并设置至少一个断点后,一切正常(!??!?)
- 我尝试放弃生成器而使用
next()
方法,当我“卸载”生成器时,我尝试在工作进程中手动调用它的next()
方法两次,而不是使用循环语句(for await (of)
),令我极度惊讶的是,两条消息都成功记录了。 - 然后,我将
for await (of)
循环替换为手动等待下一个值的while (true)
循环......而这次它不起作用!
最后,在这段相当长的介绍之后,我的问题是:
当在for/while循环中使用读句柄时,阅读句柄会挂起/阻塞/瘫痪工作线程,而不会在第一个字符串之后产生任何结果,但当手动迭代和等待时,读句柄却像一个护身符一样工作,而没有任何循环系统?为什么它在调试或测试期间工作?是否存在同步问题,或者它与算法本身的设计有关?我应该一次清空缓冲区,因为一次不处理一个字符串是正确的,还是认为即使在一次解码和阅读一个字符串时,缓冲区也应该工作?
1条答案
按热度按时间rkue9o1l1#
我终于想通了!
"罪魁祸首"是Node.js Streams中内置的处理背压的系统。
我所需要做的就是将worker中的读取循环转换为setImmediate代理的递归函数。