nodejs流水线异步流

shstlldc  于 2023-04-05  发布在  Node.js
关注(0)|答案(1)|浏览(155)

我对nodejs流相当陌生,所以请耐心等待。
我正在使用node stream pipeline方法来复制一个流(特别是一个节点获取的图像,它是一个可读的流)。当我使用同步管道API时,它工作得很好(如,我看到管道成功消息控制台日志记录),但我需要使这个过程异步,因为我一次处理(复制)多个图像。
当我在util.promisify中 Package 同步方法时,它只是永远挂起-promise似乎永远处于挂起状态(甚至没有抛出我可以看到的错误,这是最令人沮丧的部分)。我在这里错过了什么吗?欢迎提供解决方案的建议,甚至了解如何看到问题所在,因为我甚至没有返回可以尝试调试的错误消息
以下是我尝试过的代码:

import util from 'util';
import { pipeline, PassThrough } from 'stream';

// synchronous 
// confirmed that this works
// the images get copied into the streams array, I get a bunch of 'Pipeline succeeded.' console logs
const streams = allKeys.map(() => pipeline(response.body, new PassThrough(), 
  (err) => {
    if (err) {
       console.error('Pipeline failed.', err);
     } else {
       console.log('Pipeline succeeded.');
      }
    }
  )
);

// asynchronous
// this does not work, I never get any console logs and the promise just hangs forever
const pipelineAsync = util.promisify(pipeline);
  const streams = await Promise.all(allKeys.map(async () => {
    try {
      const stream = await pipelineAsync(
        response.body,
        new PassThrough()
       )
      console.log('Pipeline succeeded.');
      return stream;
    } catch(e) {
      console.log('error!', e)
      return null;
    }
  }
));

此外,如果这是有帮助的,这里有一些注意事项:
1.我在这里使用PassThrough,因为这些流也必须可读/上传到S3(不包括在上面的代码片段中)
1.另外,我在节点16上,所以我也尝试使用stream/promises中的pipeline,得到了相同的挂起结果
1.我还尝试使用同步pipeline API,并手动将其 Package 在promise中,如下所示:

import { pipeline } from 'stream';

const streams = await Promise.all(allKeys.map(async () => {
  console.log('inside the map loop!')
  return new Promise((resolve, reject) => {
    console.log('we are inside the promise!');
    return pipeline(
      response.body,
      new PassThrough(),
      (err) => {
         if (err) {
           console.error('Pipeline failed.', err);
           return reject();
         } else {
          console.log('Pipeline succeeded.');
          return resolve(undefined);
         }
       }
     )
   })
 }
));

并且我仍然得到相同的挂起行为(我得到'inside the map loop'和'we are inside the promise'控制台日志,但从未得到管道失败或成功的控制台日志。

ee7vknir

ee7vknir1#

要将一个可读流“管道”到多个可写流中,请在可读流的data事件处理程序中执行多个write命令,并对end事件执行类似的命令:

var blocked = 0;
function resume() {
  if (--blocked === 0) readable.resume();
}
writable1.on("drain", resume);
writable2.on("drain", resume);
readable.on("data", function(chunk) {
  if (!writable1.write(chunk)) blocked++;
  if (!writable2.write(chunk)) blocked++;
  if (blocked > 0) readable.pause();
})
.on("end", function() {
  writable1.end();
  writable2.end();
});

由于data事件处理程序是同步的,但写入是异步操作,因此即使可写流仍忙碌使用前一个数据块,事件处理程序也可能继续写入数据块(s),以使其缓冲区溢出。为了避免这种情况,如果一个可写流的write方法返回false,则暂停可读流(意味着它想要阻止更多的数据块),并且只有在所有被阻止的可写流再次被解除阻止并且已经发出drain事件之后才恢复。
还必须考虑错误处理。如果一个可写流失败,其他流是否应该继续?在任何情况下,如果可读流失败,所有可写流都将被销毁:

readable.on("error", function(err) {
  writable1.destroy(err);
  writable2.destroy(err);
});

相关问题