我对nodejs流相当陌生,所以请耐心等待。
我正在使用node stream pipeline
方法来复制一个流(特别是一个节点获取的图像,它是一个可读的流)。当我使用同步管道API时,它工作得很好(如,我看到管道成功消息控制台日志记录),但我需要使这个过程异步,因为我一次处理(复制)多个图像。
当我在util.promisify
中 Package 同步方法时,它只是永远挂起-promise似乎永远处于挂起状态(甚至没有抛出我可以看到的错误,这是最令人沮丧的部分)。我在这里错过了什么吗?欢迎提供解决方案的建议,甚至了解如何看到问题所在,因为我甚至没有返回可以尝试调试的错误消息
以下是我尝试过的代码:
import util from 'util';
import { pipeline, PassThrough } from 'stream';
// synchronous
// confirmed that this works
// the images get copied into the streams array, I get a bunch of 'Pipeline succeeded.' console logs
const streams = allKeys.map(() => pipeline(response.body, new PassThrough(),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
)
);
// asynchronous
// this does not work, I never get any console logs and the promise just hangs forever
const pipelineAsync = util.promisify(pipeline);
const streams = await Promise.all(allKeys.map(async () => {
try {
const stream = await pipelineAsync(
response.body,
new PassThrough()
)
console.log('Pipeline succeeded.');
return stream;
} catch(e) {
console.log('error!', e)
return null;
}
}
));
此外,如果这是有帮助的,这里有一些注意事项:
1.我在这里使用PassThrough
,因为这些流也必须可读/上传到S3(不包括在上面的代码片段中)
1.另外,我在节点16上,所以我也尝试使用stream/promises
中的pipeline
,得到了相同的挂起结果
1.我还尝试使用同步pipeline
API,并手动将其 Package 在promise中,如下所示:
import { pipeline } from 'stream';
const streams = await Promise.all(allKeys.map(async () => {
console.log('inside the map loop!')
return new Promise((resolve, reject) => {
console.log('we are inside the promise!');
return pipeline(
response.body,
new PassThrough(),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
return reject();
} else {
console.log('Pipeline succeeded.');
return resolve(undefined);
}
}
)
})
}
));
并且我仍然得到相同的挂起行为(我得到'inside the map loop'和'we are inside the promise'控制台日志,但从未得到管道失败或成功的控制台日志。
1条答案
按热度按时间ee7vknir1#
要将一个可读流“管道”到多个可写流中,请在可读流的
data
事件处理程序中执行多个write
命令,并对end
事件执行类似的命令:由于
data
事件处理程序是同步的,但写入是异步操作,因此即使可写流仍忙碌使用前一个数据块,事件处理程序也可能继续写入数据块(s),以使其缓冲区溢出。为了避免这种情况,如果一个可写流的write
方法返回false
,则暂停可读流(意味着它想要阻止更多的数据块),并且只有在所有被阻止的可写流再次被解除阻止并且已经发出drain
事件之后才恢复。还必须考虑错误处理。如果一个可写流失败,其他流是否应该继续?在任何情况下,如果可读流失败,所有可写流都将被销毁: