node.js流的错误处理

n1bvdmb6  于 2023-03-17  发布在  Node.js
关注(0)|答案(9)|浏览(165)

处理流错误的正确方法是什么?我已经知道有一个“error”事件可以监听,但是我想知道一些关于任意复杂情况的更多细节。
对于初学者,当你想做一个简单的管链时,你会怎么做:
input.pipe(transformA).pipe(transformB).pipe(transformC)...
如何正确地创建其中一个转换,以便正确地处理错误?
更多相关问题:

  • 当一个错误发生时,“结束”事件会发生什么?2它永远不会被触发吗?3它有时会被触发吗?4它是否依赖于转换/流?5这里的标准是什么?
  • 是否有通过管道传播错误的机制?
  • 域能有效地解决这个问题吗?2举例会很好。
  • 从“error”事件中产生的错误有堆栈跟踪吗?有时候?从来没有?有办法从中得到一个吗?
bis0qfac

bis0qfac1#

转变

转换流既可读又可写,因此是非常好的“中间”流。因此,它们有时被称为through流。它们在以下方面类似于双工流:除了它们提供了一个很好的接口来操作数据而不是仅仅发送数据。转换流的目的是在数据通过流管道传输时操作数据。例如,您可能希望执行一些异步调用,或者派生几个字段,重新Map一些内容,等等。

关于如何创建一个转换流,请看这里和这里。你所要做的就是:
1.包括流模块
1.示例化(或继承)Transform类
1.实现一个接受(chunk, encoding, callback)_transform方法。
块就是你的数据。如果你在objectMode = true中工作,大多数时候你不需要担心编码。当你处理完块时,回调被调用。然后这个块被推到下一个流。
如果你想要一个很好的帮助模块,使你能够非常非常容易地通过流,我建议through2
对于错误处理,请继续阅读。

管道

在管道链中,处理错误确实不是小事。根据this thread。pipe()不是为转发错误而构建的。所以类似于...

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

...只会侦听流c上的错误。如果在a上发出错误事件,则不会向下传递,实际上会抛出。要正确执行此操作:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

现在,尽管第二种方法更冗长,但至少可以保留错误发生的上下文,这通常是一件好事。
如果您只想捕获目标位置的错误,而不太关心错误发生的位置,那么我发现event-stream库很有帮助。

结束

当一个错误事件被激发时,结束事件不会被激发(显式)。错误事件的发出将结束流。

域名

根据我的经验,域在大多数情况下都能很好地工作。如果你有一个未处理的错误事件(即在没有监听器的流上发出错误),服务器可能会崩溃。现在,正如上面的文章所指出的,你可以将流 Package 在一个域中,这样就可以正确地捕获所有错误。

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });
  • 以上代码示例来自此帖子

域的美妙之处在于它们将保留堆栈跟踪,尽管事件流在这方面也做得很好。
要进一步阅读,请查看stream-handbook 1。相当深入,但超级有用,并给出了一些很好的链接,许多有用的模块。
1:* 注:此链接指向archive.org,因为原始GitHub repo已于2022年8月左右删除。*

prdp8dxp

prdp8dxp2#

如果您使用的是node〉= v10.0.0,则可以使用stream.pipeline和stream.finished。
例如:

const { pipeline, finished } = require('stream');

pipeline(
  input, 
  transformA, 
  transformB, 
  transformC, 
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});

finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

有关详细讨论,请参阅此github PR

inb24sb2

inb24sb23#

域名是不赞成的.你不需要他们.
对于这个问题,转换或可写之间的区别并不重要。
mshell_lauren的答案很好,但是作为一种替代方法,你也可以显式地监听你认为可能出错的每个流上的错误事件,并且如果你愿意的话,重用处理函数。

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }

这样做可以防止其中一个流触发错误事件时出现臭名昭著的未捕获异常

vql8enpb

vql8enpb4#

来自整个链的错误可以使用简单的函数传播到最右边的流:

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}

其可以像这样使用:

safePipe(readable, [ transform1, transform2, ... ]);
np8igboo

np8igboo5#

.on("error", handler)只处理流错误,但如果您使用自定义的Transform流,.on("error", handler)不会捕捉_transform函数内部发生的错误。因此,您可以这样做来控制应用程序流:-
_transform函数中的this关键字引用Stream本身,它是EventEmitter。因此,您可以使用try catch(如下所示)捕获错误,然后将其传递给自定义事件处理程序。

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)

这样,你就可以将逻辑和错误处理程序分开,也可以选择只处理某些错误而忽略其他错误。

更新

备选方案:RXJS可观察

bakd9h0s

bakd9h0s6#

使用multipipe包将多个流合并为一个双工流。并在一个位置处理错误。

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 

// centralized error handling
stream.on('error', fn)
whitzsjs

whitzsjs7#

使用Node.js模式,方法是创建一个Transform流机制,并使用一个参数调用其回调done,以便传播错误:

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);

transformStream1.prototype._transform = function (chunk, encoding, done) {
  //var stream = this;

  try {
    // Do your transform code
    /* ... */
  } catch (error) {
    // nodejs style for propagating an error
    return done(error);
  }

  // Here, everything went well
  done();
}

// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
  .pipe(transformStream1)
  .on('error', function (error) {
    console.error('Error in transformStream1:');
    console.error(error);
    process.exit(-1);
   })
  .pipe(someWriteStream)
  .on('close', function () {
    console.log('OK.');
    process.exit();
  })
  .on('error', function (error) {
    console.error(error);
    process.exit(-1);
   });
ngynwnxp

ngynwnxp8#

const http = require('http');
const fs = require('fs');
const server = http.createServer();

server.on('request',(req,res)=>{
    const readableStream = fs.createReadStream(__dirname+'/README.md');
    const writeableStream = fs.createWriteStream(__dirname+'/assets/test.txt');
    readableStream
    .on('error',()=>{
        res.end("File not found")
    })
    .pipe(writeableStream)
    .on('error',(error)=>{
        console.log(error)
        res.end("Something went to wrong!")
    })
    .on('finish',()=>{
        res.end("Done!")
    })
})

server.listen(8000,()=>{
    console.log("Server is running in 8000 port")
})
wvyml7n5

wvyml7n59#

Try catch不会捕获流中发生的错误,因为它们是在调用代码退出后抛出的。您可以参考文档:
https://nodejs.org/dist/latest-v10.x/docs/api/errors.html

相关问题