NodeJS 如何用Readable Stream节点实现一个流生成器?

50pmv0ei  于 11个月前  发布在  Node.js
关注(0)|答案(2)|浏览(104)

我想做这样的事

const { Readable } = require("stream");

function generatorToStream(generator) {
  return new Readable({
    read() {
      (async () => {
        for await (const result of generator()) {
          if (result.done) {
            this.push(null);
          } else {
            this.push(result.value);
          }
        }
      })();
    }
  });
}

generatorToStream(async function*() {
  const msg1 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg1;
  const msg2 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg2;

  const msg3 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg3;
}).pipe(process.stdout);

字符串
但它不工作,结束事件从来没有被调用,我还没有收到我的终端上的任何数据.
任何解决方案或技巧如何实现它?

omtl5h9j

omtl5h9j1#

我是Scramjet的作者,这是一个功能性的流处理框架,对你来说可能是一个简单的解决方案。
如果你可以在你的项目中只添加3个依赖项,那么这再简单不过了:

const {StringStream} = require("scramjet");

StringStream.from(async function* () {
    yield await something();
    ...
});

字符串
如果你想自己实现它,看看DataStream第112行的源代码-它应该很容易实现。一般来说,你需要实现这样的东西:

function generatorToStream(gen) {
    // obtain an iterator
    const iter = await gen();
    // create your output
    const out = new Passthrough();

    // this IIFE will do all the work
    (async () => {
        let done = false;
        for await (let chunk of iter) {
            // if write returns true, continue, otherwise wait until out is drained.
            if (!out.write(chunk)) await new Promise((res, rej) => this.once("drain", res);
        }
    })()
        // handle errors by pushing them to the stream for example
        .catch(e => out.emit('error', e));

    // return the output stream
    return out;
}


上面的例子或多或少是在超燃冲压发动机中发生的事情-在保持更少的事件处理程序等方面有更多的优化,但是上面的例子在简单的情况下应该工作得很好。

zbsbpyhn

zbsbpyhn2#

下面是非常基本的工作示例:

import { Readable } from 'stream';

const generator = async function* () {
  yield 1;
  yield 2;
  yield 3;
};

const myReadableStream = Readable.from(generator());
myReadableStream.on('data', (data) => console.log(data));

字符串

相关问题