如何在Node.js中执行多个顺序操作?

vltsax25  于 2023-05-28  发布在  Node.js
关注(0)|答案(1)|浏览(257)

如何构建ETL管道脚本来解压缩、提取、转换、保存和压缩文件?我可以使用un-gunzip,但无法提取、转换、保存和gunzip。我试图按照这个教程让我开始:https://www.mariokandut.com/transform-data-etl-pipeline-node-js/有一件事让我很困扰,那就是如何在每个顺序步骤之后循环遍历文件。我在提取步骤中得到一个意外错误SyntaxError: Unexpected end of JSON input
我能够在一个单独的示例中进行提取、转换和保存。尽管我无法成功地将其组合到这个ETL管道脚本中。

const fs = require('fs');
const {promises: {readdir, readFile, writeFile}} = require("fs");
var url = require('url');
const zlib = require('zlib');

const input_dir = __dirname + '/input'
const input_unzipped_dir = __dirname + '/input-unzipped'
const output_dir = __dirname + '/output'

async function get_files(dir) {
  return await readdir(dir).then(response =>
    response
  );
}

function read_file(file_path, callback) {
    fs.readFile(file_path, 'utf-8', (err, file_data) => {
      if (err) {
        return callback && callback(err);
      }

      try {
        const object = JSON.parse(file_data);
        return callback && callback(null, object);
      } catch (err) {
        return callback && callback(err);
      }
    })
}

function transform_JSON(file_data) {
  console.log("ts is:", file_data.ts); // => "timestamp"
  console.log("u is:", file_data.u); // => "url"
  console.log("e is:", file_data.e); // => "event"

  console.log(url.parse(file_data.u))
  u = url.parse(file_data.u)

  const query_map = new Map(Object.entries(file_data.e));

  const output = {
    timestamp: file_data.ts,
    url_object: {
      domain: u.host,
      path: u.path,
      query_object: query_map,
      hash: u.hash,
    },
    ec: file_data.e,
  }
  const jsonString = JSON.stringify(output)
  console.log(jsonString)
  return jsonString
}

const orchestrate_etl_pipeline = async () => {
  try {
    // extract

    files = await get_files(input_dir);

    console.log(files);

    if (!fs.existsSync(input_unzipped_dir)){
      fs.mkdirSync(input_unzipped_dir);
    }

    Promise.all(files.map(filename => {
      if (filename.endsWith('.gz')) {
        return new Promise((resolve, reject) => {
          const fileContents = fs.createReadStream(`${input_dir}/${filename}`);
          const writeStream = fs.createWriteStream(`${input_unzipped_dir}/${filename.slice(0, -3)}`);
          const unzip = zlib.createGunzip();
          fileContents.pipe(unzip).pipe(writeStream).on('finish', (err) => {
            if (err) return reject(err);
            else resolve();
          })
        })
      }
    }))
    .then(
      console.log('unzip done')
    );
      
    // transform

    files_unzipped = await get_files(input_unzipped_dir);

    Promise.all(files_unzipped.map(filename => {
      if (filename.endsWith('.json')) {
        read_file(`${input_unzipped_dir}/${filename}`, (err, file_data) => {
          if (err) {
            console.error(err);
            return
          }

          transform_JSON = transform_JSON(file_data)
          
          console.log(transform_JSON)
        })
      }
    }))
    .then(
      console.log('transform done')
    );

    // save file
    // gunzip file
  } catch (error) {
    console.log(error);
  }
}

orchestrate_etl_pipeline().then(console.log('etl done'));

单独转换和保存文件示例:

function jsonReader(file_path, callback) {
  fs.readFile(file_path, (err, file_data) => {
    if (err) {
      return callback && callback(err);
    }
    try {
      const object = JSON.parse(file_data);
      return callback && callback(null, object);
    } catch (err) {
      return callback && callback(err);
    }
  });
}

jsonReader(`${input_zipped_dir}/t1669976028340.json`, (err, input) => {
  if (err) {
    console.log(err);
    return;
  }
  console.log("ts is:", input.ts); // => "ts"
  console.log("u is:", input.u); // => "u"
  console.log("e is:", input.e); // => "e"

  console.log(url.parse(input.u))
  u = url.parse(input.u)

  const query_map = new Map(Object.entries(input.e));

  const output = {
    timestamp: input.ts,
    url_object: {
      domain: u.host,
      path: u.path,
      query_object: query_map,
      hash: u.hash,
    },
    ec: input.e,
  }
  
  jsonString = JSON.stringify(output)

  console.log(jsonString)

  fs.writeFile(`${input_zipped_dir}/t1669976028340.json`, jsonString, err => {
    if (err) {
      console.log('Error writing file', err)
    } else {
      console.log('Successfully wrote file')
    }
  })
})
enyaitl3

enyaitl31#

我推荐你使用etl-gun来创建ETL管道。它支持rxjs范式,使管道非常可视化。它有json和filesystem的端点。

相关问题