NodeJS 如何使用Express流式传输x-ndjson内容并解析流式传输的数据?

hgc7kmma  于 2022-11-29  发布在  Node.js
关注(0)|答案(1)|浏览(152)

我有一个使用Nodev19.1.0的TS库。该库有一个观察流服务器事件的函数。
服务器提供/events路由流'application/x-ndjson'内容,可能是event/ping/...(每x秒发送一次ping对于保持连接有效非常重要)
我的observe函数解析并检查流数据。如果它是一个有效的事件,它将把它传递给回调函数。调用者还收到一个中止函数,按需中止流。
每当我在本地运行测试或通过CI运行测试时,我都会收到以下错误
警告:测试“观察到事件.”在测试结束后生成了异步活动.此活动创建了错误“AbortError:该操作已中止。”并将导致测试失败,但却触发了unhandledRejection事件。
我尝试使用纯JavaScript最小化示例代码,

const assert = require('assert/strict');
const express = require('express');
const { it } = require('node:test');

it('observes events.', async () => {
    const expectedEvent = { type: 'event', payload: { metadata: { type: 'entity-created', commandId: 'commandId' } } };

    const api = express();
    const server = api
        .use(express.json())
        .post('/events', (request, response) => {
            response.writeHead(200, {
                'content-type': 'application/x-ndjson',
            });

            const line = JSON.stringify(expectedEvent) + '\n';
            response.write(line);
        })
        .listen(3000);

    let stopObserving = () => {
        throw new Error('should never happen');
    };

    const actualEventPayload = await new Promise(async resolve => {
        stopObserving = await observeEvents(async newEvent => {
            resolve(newEvent);
        });
    });

    stopObserving();
    server.closeAllConnections();
    server.close();

    assert.deepEqual(actualEventPayload, expectedEvent.payload);
});

const observeEvents = async function (onReceivedFn) {
    const abortController = new AbortController();

    const response = await fetch('http://localhost:3000/events', {
        method: 'POST',
        headers: { 'content-type': 'application/json' },
        signal: abortController.signal,
    });

    if (!response.ok) {
        throw new Error('error handling goes here - request failed');
    }

    Promise.resolve().then(async () => {
        if (!response.body) {
            throw new Error('error handling goes here - missing response body');
        }

        for await (const item of parseStream(response.body, abortController)) {
            switch (item.type) {
                case 'event': {
                    await onReceivedFn(item.payload);

                    break;
                }
                case 'ping':
                    // Intentionally left blank
                    break;
                case 'error':
                    throw new Error('error handling goes here - stream failed');
                default:
                    throw new Error('error handling goes here - should never happen');
            }
        }
    });

    return () => { abortController.abort(); };
};

const parseLine = function () {
    return new TransformStream({
        transform(chunk, controller) {
            try {
                const data = JSON.parse(chunk);

                // ... check if this is a valid line...

                controller.enqueue(data);
            } catch (error) {
                controller.error(error);
            }
        },
    });
};

const splitLines = function () {
    let buffer = '';

    return new TransformStream({
        transform(chunk, controller) {
            buffer += chunk;

            const lines = buffer.split('\n');

            for (let i = 0; i < lines.length - 1; i++) {
                controller.enqueue(lines[i]);
            }

            buffer = lines.at(-1) ?? '';
        },
        flush(controller) {
            if (buffer.length > 0) {
                controller.enqueue(buffer);
            }
        },
    });
};

const parseStream = async function* (stream, abortController) {
    let streamReader;

    try {
        const pipedStream = stream
            .pipeThrough(new TextDecoderStream())
            .pipeThrough(splitLines())
            .pipeThrough(parseLine());

        streamReader = pipedStream.getReader();

        while (true) {
            const item = await streamReader.read();

            if (item.done) {
                break;
            }

            yield item.value;
        }
    } finally {
        await streamReader?.cancel();
        abortController.abort();
    }
};

不幸的是,当运行node --test时,测试没有完成,我不得不手动取消它。
测试在这些行中断

const actualEventPayload = await new Promise(async resolve => {
    stopObserving = await observeEvents(async newEvent => {
        resolve(newEvent);
    });
});

我认为这是因为Promise永远不会解析。我认为流解析可能有错误,但如果您删除所有流解析的内容并替换

Promise.resolve().then(async () => {
    /* ... */
});

Promise.resolve().then(async () => {
    await onReceivedFn({ metadata: { type: 'entity-created', commandId: 'commandId' }});
});

也不管用有人知道哪里出了问题或者少了什么吗

4c8rllxm

4c8rllxm1#

这里的问题与你的承诺没有解决无关,因为你甚至从来没有达到那一点。
这里的问题是observeEvents在测试运行时尚未初始化,因此抛出了ReferenceError: Cannot access 'observeEvents' before initialization错误。
要亲自查看,您可以在文件顶部添加一个简单的const it = (name, fn) => fn();存根,然后在没有--test的情况下运行它。
有多种方法可以解决这个问题,最简单的方法是将测试函数移到文件的底部。
如果不想这样做,也可以像这样定义observeEvents函数:async function observeEvents(onReceivedFn) {...}。这样它将立即可用。

相关问题