Node.js将流复制到文件中,而不消耗

qlvxas9a  于 2023-10-17  发布在  Node.js
关注(0)|答案(2)|浏览(168)

给定一个函数解析传入的流:

async onData(stream, callback) {
    const parsed = await simpleParser(stream)

    // Code handling parsed stream here
    // ...

    return callback()
}

我正在寻找一种简单而安全的方法来“克隆”该流,这样我就可以将其保存到一个文件中用于调试目的,而不会影响代码。这可能吗?
同样的问题在假代码:我也想做这样的事。显然,这是一个虚构的例子,不起作用。

const fs = require('fs')
const wstream = fs.createWriteStream('debug.log')

async onData(stream, callback) {
    const debugStream = stream.clone(stream) // Fake code
    wstream.write(debugStream)

    const parsed = await simpleParser(stream)

    // Code handling parsed stream here
    // ...

    wstream.end()

    return callback()
}
i7uq4tfw

i7uq4tfw1#

不,你不能克隆一个可读的流而不消费。但是,您可以通过管道传输两次,一次用于创建文件,另一次用于“克隆”。
代码如下:

let Readable = require('stream').Readable;
var stream = require('stream')

// original stream, maybe from your parser or network
var s = new Readable()
s.push('beep')
s.push(null)  

// here use stream1 for normal usage, like creating file, 
// and use stream2 for debugging, like a cloned stream.
var stream1 = s.pipe(new stream.PassThrough())
var stream2 = s.pipe(new stream.PassThrough())

// I just print them out for a quick show
stream1.pipe(process.stdout)
stream2.pipe(process.stdout)
yqlxgs2m

yqlxgs2m2#

我尝试过实现@jiajianrong提供的解决方案,但是很难让它与ReadStream一起工作,因为当我尝试直接推送ReadStream时,Readable会抛出错误。比如:

s.push(createReadStream())

为了解决这个问题,我使用了一个辅助函数将流转换为缓冲区。

function streamToBuffer (stream: any) {
  const chunks: Buffer[] = []
  return new Promise((resolve, reject) => {
    stream.on('data', (chunk: any) => chunks.push(Buffer.from(chunk)))
    stream.on('error', (err: any) => reject(err))
    stream.on('end', () => resolve(Buffer.concat(chunks)))
  })
}

下面的解决方案,我发现使用一个管道来生成流的散列,另一个管道将流上传到云存储。

import stream from 'stream'
const Readable = require('stream').Readable

const s = new Readable()
s.push(await streamToBuffer(createReadStream()))
s.push(null)

const fileStreamForHash = s.pipe(new stream.PassThrough())
const fileStreamForUpload = s.pipe(new stream.PassThrough())

// Generating file hash
const fileHash = await getHashFromStream(fileStreamForHash)

// Uploading stream to cloud storage
await BlobStorage.upload(fileName, fileStreamForUpload)

我的回答大多是根据贾建荣的回答。

相关问题