NodeJS 在promise.all()中下载s3客户端时丢失网络连接

ljsrvy3e  于 2023-10-17  发布在  Node.js
关注(0)|答案(1)|浏览(112)

我正在使用一个基于nodejs的服务器,它从s3下载1000个pdf文件,每个9 MB,压缩成zip文件,并返回URL。
但是,从S3下载时出现问题。当我开始在S3上下载时,大约下载了100个下载,然后无法连接互联网。看起来它不相关的大小,因为它是成功的,当尝试与5个文件与1GB的文件。
这会导致mysql连接丢失> transaction fail [ERROR] Error: BEGIN; - Can't add new command when connection is in closed statelost connection
我怀疑繁重的网络操作不应该放在promise.all()中,但我不知道确切的原因。救命啊!救命啊!

await Promise.all(
  Ids.map((id) => {
    const readablePayloads: Array<{ filename: string; bugffer: Readable; }> = [];
    // get key, bucket by id
    
    const buffer = await S3Helper.getReadable({bucket, key} {accessKeyId, secretAccessKey});
    readablePayloads.push({ filename, buffer });
  })
)
static async getReadable(opts: { bucket: string; key: string }, awsOpts: AwsOptions): Promise<Readable> {
    return (await createS3Client(awsOpts).send(new AWS.GetObjectCommand({ Bucket: opts.bucket, Key: opts.key }))).Body as Readable;
  }

可以肯定的是,这段代码工作的连接数不到100个

klsxnrf1

klsxnrf11#

我认为这个问题的几个可能原因:

  • 文件描述符限制。

任何打开的连接都使用文件描述符。Linux默认的每个进程的文件描述符限制是1024。因此,除了分叉你的node.js进程或增加限制之外,你对此无能为力。不推荐后者,除非你有很好的理由。另一种方法是限制连接的数量,我将在下面介绍。

  • 内存堆限制。

这一点可能性较小,但仍然:每个打开的连接使用一定量的存储器。此外,根据您编写代码和处理可读流的方式,您可能会在可读流中使用过多的缓冲甚至内存泄漏。

限制连接数

以较小的组下载文件,而不是一次全部下载。它可能看起来像这样:

// _.chunk comes from the `lodash` library.
// Now `chunksOfIds` is an array of arrays, each of which contains up to 100 ids.
// i.e [[1,2,...,100],[101,102,...,200],...,[1001,1002,...,1100]]
const chunksOfIds = _.chunk(Ids, 100)

for (const chunk of chunksOfIds) {
  const readablePayloadsOfThisChunk: Array<{ filename: string; bugffer: Readable; }> = [];

  chunk.map((id) => {
    // get key, bucket by id

    const buffer = await S3Helper.getReadable({bucket, key} {accessKeyId, secretAccessKey});
    readablePayloadsOfThisChunk.push({ filename, buffer });
  })

  // Note: you have to process your `readablePayloads` here, in the loop,
  // so that you will finish your computations before the processing of the next chunk will start
  await processReadablePayloads(readablePayloadsOfThisChunk)
}

相关问题