用fast-csv处理NestJS中的大数据文件

jum4pzuy  于 2023-04-03  发布在  其他
关注(0)|答案(1)|浏览(102)

我使用NestJS v9,fast-csv v4和BigQuery。
1.我的控制器(我上传了一个巨大的CSV)

@Post('upload')
@ApiOperation({ description: 'Upload CSV File' })
@ApiConsumes('multipart/form-data')
@ApiBody({
  schema: {
    type: 'object',
    properties: {
      file: {
        type: 'string',
        format: 'binary',
      },
    },
  },
})
@ApiResponse({
  type: UploaderResponse,
  status: 200,
})
@UseInterceptors(FileInterceptor('file', { fileFilter: fileFilter }))
@Auth(UserRole.Admin, UserRole.SuperAdmin)
async uploadFile(
  @Body() uploadCsvDto: UploadCsvDto,
  @UploadedFile() file: Express.Multer.File,
): Promise<UploaderResponse> {
  if (!file) {
    throw new HttpException(FileErrors.CSVFormat, HttpStatus.BAD_REQUEST);
  }
  return await this.filesService.uploadCsv(file, uploadCsvDto);
}

1.我的服务

async uploadCsv(
  fileData: Express.Multer.File,
  uploadCsvDto: UploadCsvDto,
): Promise<UploaderResponse> {
  memoryUsage();
  await this.checkForDuplicates(uploadCsvDto);
  memoryUsage();

  // 2. Parse CSV
  await this.batchCsvResults(fileData.buffer, uploadCsvDto);
}

async batchCsvResults(
  buffer: Buffer,
  uploadCsvDto: UploadCsvDto,
): Promise<void> {
  const streamFromBuffer = Readable.from(buffer);
  const stream = parseStream(streamFromBuffer, {
    headers: (headers: string[]) => {
      return headers.map((h: string) => {
        if (this.checkCorrectColumn(h)) {
          return VALID_CSV_COLUMNS.find(
            (column: ValidColumn) => column.name === h.toLowerCase(),
          ).column;
        } else {
          console.log(`'${h}' no es una columna válida.`);
        }
      });
    },
    delimiter: ';',
    encoding: 'utf8',
    trim: true, // trim white spaces on columns only
  });

  let batch: DataRow[] = [];

  // Promise for waitting all the batches to end
  let resolvePromise: () => void;
  const processingPromise: Promise<void> = new Promise<void>((resolve) => {
    resolvePromise = resolve;
  });

  stream.on('data', async (row: DataRow) => {
    batch.push(row);

    if (batch.length === 200) {
      stream.pause();
      await this.processBatch(batch, uploadCsvDto)
        .then(() => {
          batch.length = 0;
          stream.resume();
        })
        .catch((err) => {
          console.error(err);
          stream.destroy();
        });
    }
  });

  stream.on('end', async () => {
    if (batch.length > 0) {
      await this.processBatch(batch, uploadCsvDto)
        .then(() => {
          batch = null;
          console.log('All batches have been processed');
        })
        .catch((err) => {
          console.error(err);
        });
    } else {
      console.log('All batches have been processed');
    }
    resolvePromise();
  });

  stream.on('error', (err) => {
    console.error(err);
  });

  await processingPromise;
}

async mapChildRows(
  data: DataRow[],
  uploadCsvDto: UploadCsvDto,
): Promise<void> {
  memoryUsage();
  let formattedRows = [];
  for (let i = 0; i < data.length; i++) {
    // Convert object values to their type
    const dataObject: any = data[i];
    Object.keys(dataObject).forEach((key) => {
      dataObject[key] = this.getChildColumnValue(key, dataObject[key]);
    });

    // Add dynamic fields
    dataObject[DYNAMIC_CSV_COLUMNS.idClient] = uploadCsvDto.idClient;
    dataObject[DYNAMIC_CSV_COLUMNS.subservice] = uploadCsvDto.subservice;
    dataObject[DYNAMIC_CSV_COLUMNS.subtask] = uploadCsvDto.subtask;

    // Add actual day to uploaded file. We have an extra space at the end of the array, because
    // they do not send us the upload_date, its a fixed column but a dynamic value added by us
    dataObject[DYNAMIC_CSV_COLUMNS.upload_date] = actualDate();
    formattedRows.push(dataObject);
  }

  memoryUsage();
  await this.insertIntoCustomAnalyticsBigQuery(formattedRows);
  data = null;
  formattedRows = null;
}

async insertIntoCustomAnalyticsBigQuery(rows: DataRow[]) {
  try {
    memoryUsage();
    await this.bigQueryClient.dataset(DB_NAME).table(DATA_TABLE).insert(rows);
    console.log(`Inserted ${rows.length} rows`);
  } catch (ex) {
    console.log(JSON.stringify(ex));
    throw new HttpException(
      'Error inserting into Data table from CSV.',
      HttpStatus.BAD_REQUEST,
    );
  }
}

1.服务说明

  • 首先,我检查BigQuery是否有以前更新的CSV的重复(这并不重要)。
  • 然后我使用一个流(fast-csv)来防止所有csv内容进入内存。
  • 我每隔200行转换一次数据,从这200行中,我保存到bigquery中,每行50行。
  • 最后,我将其保存到BigQuery中。
    **问题:**这对50.000行的文件很好用,但更高的文件会给我带来内存问题:
    **50,000行CSV:**x1c 0d1x
    250.000行CSV:

我不明白为什么,当我清空所有不需要的变量时。

我的节点服务器还告诉我,我的最大内存大小是512 MB。

我的内存检查功能

export function memoryUsage(): void {
 return console.log(
  `APP is using ${
    Math.round((process.memoryUsage().rss / 1024 / 1024) * 100) / 100
  } MB of memory.`,
 );
}
5cg8jx4n

5cg8jx4n1#

如果有人在这方面遇到困难。我找到了一个解决方案。你只需要从CSV中分块缓冲区。
大概是这样的

const ROWS_BATCH = 200;
const blockSize = 100 * 100 * 100; // 0,9537 MB.
const csvHeaders: string[] = [];
let offset = 0;

// 1. Chunk buffer, 2. Chunk rows, 3. Chunk insertions into BigQuery
while (offset < buffer.length) {
  const remainingBytes = buffer.length - offset;
  const currentBlockSize = Math.min(blockSize, remainingBytes);
  const chunk = buffer.slice(offset, offset + currentBlockSize);

  const streamFromBuffer = Readable.from(chunk);
  // rest of code ...
}

所以你并不是把文件中的所有缓冲区都加载到内存中,而是分块加载。

相关问题