我有一个使用Nodev19.1.0的TS库。该库有一个观察流服务器事件的函数。
服务器提供/events
路由流'application/x-ndjson'
内容,可能是event/ping/...(每x秒发送一次ping对于保持连接有效非常重要)
我的observe函数解析并检查流数据。如果它是一个有效的事件,它将把它传递给回调函数。调用者还收到一个中止函数,按需中止流。
每当我在本地运行测试或通过CI运行测试时,我都会收到以下错误
警告:测试“观察到事件.”在测试结束后生成了异步活动.此活动创建了错误“AbortError:该操作已中止。”并将导致测试失败,但却触发了unhandledRejection事件。
我尝试使用纯JavaScript最小化示例代码,
const assert = require('assert/strict');
const express = require('express');
const { it } = require('node:test');
it('observes events.', async () => {
const expectedEvent = { type: 'event', payload: { metadata: { type: 'entity-created', commandId: 'commandId' } } };
const api = express();
const server = api
.use(express.json())
.post('/events', (request, response) => {
response.writeHead(200, {
'content-type': 'application/x-ndjson',
});
const line = JSON.stringify(expectedEvent) + '\n';
response.write(line);
})
.listen(3000);
let stopObserving = () => {
throw new Error('should never happen');
};
const actualEventPayload = await new Promise(async resolve => {
stopObserving = await observeEvents(async newEvent => {
resolve(newEvent);
});
});
stopObserving();
server.closeAllConnections();
server.close();
assert.deepEqual(actualEventPayload, expectedEvent.payload);
});
const observeEvents = async function (onReceivedFn) {
const abortController = new AbortController();
const response = await fetch('http://localhost:3000/events', {
method: 'POST',
headers: { 'content-type': 'application/json' },
signal: abortController.signal,
});
if (!response.ok) {
throw new Error('error handling goes here - request failed');
}
Promise.resolve().then(async () => {
if (!response.body) {
throw new Error('error handling goes here - missing response body');
}
for await (const item of parseStream(response.body, abortController)) {
switch (item.type) {
case 'event': {
await onReceivedFn(item.payload);
break;
}
case 'ping':
// Intentionally left blank
break;
case 'error':
throw new Error('error handling goes here - stream failed');
default:
throw new Error('error handling goes here - should never happen');
}
}
});
return () => { abortController.abort(); };
};
const parseLine = function () {
return new TransformStream({
transform(chunk, controller) {
try {
const data = JSON.parse(chunk);
// ... check if this is a valid line...
controller.enqueue(data);
} catch (error) {
controller.error(error);
}
},
});
};
const splitLines = function () {
let buffer = '';
return new TransformStream({
transform(chunk, controller) {
buffer += chunk;
const lines = buffer.split('\n');
for (let i = 0; i < lines.length - 1; i++) {
controller.enqueue(lines[i]);
}
buffer = lines.at(-1) ?? '';
},
flush(controller) {
if (buffer.length > 0) {
controller.enqueue(buffer);
}
},
});
};
const parseStream = async function* (stream, abortController) {
let streamReader;
try {
const pipedStream = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(splitLines())
.pipeThrough(parseLine());
streamReader = pipedStream.getReader();
while (true) {
const item = await streamReader.read();
if (item.done) {
break;
}
yield item.value;
}
} finally {
await streamReader?.cancel();
abortController.abort();
}
};
不幸的是,当运行node --test
时,测试没有完成,我不得不手动取消它。
测试在这些行中断
const actualEventPayload = await new Promise(async resolve => {
stopObserving = await observeEvents(async newEvent => {
resolve(newEvent);
});
});
我认为这是因为Promise永远不会解析。我认为流解析可能有错误,但如果您删除所有流解析的内容并替换
Promise.resolve().then(async () => {
/* ... */
});
与
Promise.resolve().then(async () => {
await onReceivedFn({ metadata: { type: 'entity-created', commandId: 'commandId' }});
});
也不管用有人知道哪里出了问题或者少了什么吗
1条答案
按热度按时间4c8rllxm1#
这里的问题与你的承诺没有解决无关,因为你甚至从来没有达到那一点。
这里的问题是
observeEvents
在测试运行时尚未初始化,因此抛出了ReferenceError: Cannot access 'observeEvents' before initialization
错误。要亲自查看,您可以在文件顶部添加一个简单的
const it = (name, fn) => fn();
存根,然后在没有--test
的情况下运行它。有多种方法可以解决这个问题,最简单的方法是将测试函数移到文件的底部。
如果不想这样做,也可以像这样定义
observeEvents
函数:async function observeEvents(onReceivedFn) {...}
。这样它将立即可用。