如何在Node.js中使用stream.Writable的drain事件

ajsxfq5m  于 2022-11-29  发布在  Node.js
关注(0)|答案(5)|浏览(171)

在Node.js中,我使用fs.createWriteStream方法将数据追加到本地文件中。在Node文档中,他们提到了使用fs.createWriteStream时的drain事件,但我不理解它。

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);

在上面的代码中,如何使用drain事件?下面的事件使用正确吗?

var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}
xienkqul

xienkqul1#

drain事件用于可写流的内部缓冲区已清空时。
只有当内部缓冲区的大小曾经超过其highWaterMark属性时,才会发生这种情况,highWaterMark属性是可写入数据流的内部缓冲区中可以储存的最大数据字节数,直到它停止从数据来源阅读为止。
出现这种情况的原因可能是由于设置涉及从一个流阅读数据源的速度快于将其写入另一个资源的速度。例如,以两个流为例:

var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');

现在,假设文件read位于SSD上,读取速度为500 MB/s,而write位于HDD上,写入速度仅为150MB/s。写入流将无法跟上,并开始将数据存储到内部缓冲区中。一旦缓冲区达到highWaterMark(默认为16 KB),写入将开始返回false。并且流将在内部排队清空。一旦内部缓冲区的长度为0,则会触发drain事件。
排水管的工作原理如下:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

以下是漏极的先决条件,漏极是writeOrBuffer函数的一部分:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;

要了解如何使用drain事件,请参考Node.js文档中的示例。

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

该函数的目标是向可写流写入1,000,000次。所发生的是将变量ok设置为true,并且仅当ok为true时才执行循环。对于每次循环迭代,ok的值被设置为stream.write()的值。如果需要drain,则返回false。如果ok变为false,则drain的事件处理程序等待,并在激发时恢复写入。
具体到您的代码,您不需要使用drain事件,因为您只在打开流之后写一次,因为您还没有向流写入任何内容,内部缓冲区是空的,并且您必须以块的形式写入至少16 KB,才能触发drain事件。drain事件用于多次写入比可写流的highWaterMark设置更多的数据。

rks48beu

rks48beu2#

假设你正在连接两个带宽非常不同的流,比如说,上传一个本地文件到一个慢速服务器,(快速的)文件流发送数据的速度比(慢速的)套接字流消耗数据的速度快。
在这种情况下,node.js会将数据保留在内存中,直到慢速流有机会处理它。如果文件非常大,这可能会出现问题。
为了避免这种情况,Stream.write会在基础系统缓冲区已满时传回false。如果您停止写入,数据流稍后会发出drain事件,指出系统缓冲区已清空,适合再次写入。
您可以使用pause/resume可读流并控制可读流的带宽。
更好的方法是:您可以使用readable.pipe(writable),它将为您完成此操作。

EDIT:您的程式码中有错误:无论write返回什么,数据都已经写入。不需要重试。在本例中,您将写入data两次。

类似这样的方法可以奏效:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}
qpgpyjmq

qpgpyjmq3#

下面是一个带有async/await的版本

const write = (writer, data) => {
  return new Promise((resolve) => {
    if (!writer.write(data)) {
      writer.once('drain', resolve)
    }
    else {
      resolve()
    }
  })
}

// usage
const run = async () => {
  const write_stream = fs.createWriteStream('...')
  const max = 1000000
  let current = 0
  while (current <= max) {
    await write(write_stream, current++)
  }
}

https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73

7gs2gvoe

7gs2gvoe4#

这是一个使用Promises(async/await)的速度优化版本。调用者必须检查是否返回了promise,只有在这种情况下才需要调用await。每次调用时执行await会使程序的速度降低3倍...

const write = (writer, data) => {
    // return a promise only when we get a drain
    if (!writer.write(data)) {
        return new Promise((resolve) => {
            writer.once('drain', resolve)
        })
    }
}

// usage
const run = async () => {
    const write_stream = fs.createWriteStream('...')
    const max = 1000000
    let current = 0
    while (current <= max) {
        const promise = write(write_stream, current++)
        // since drain happens rarely, awaiting each write call is really slow.
        if (promise) {
            // we got a drain event, therefore we wait
            await promise
        }
    }
}
c6ubokkw

c6ubokkw5#

解释得好!
管道是处理背压的最佳解决方案!
https://nodejs.org/es/docs/guides/backpressuring-in-streams/

相关问题