如何构建ETL管道脚本来解压缩、提取、转换、保存和压缩文件?我可以使用un-gunzip,但无法提取、转换、保存和gunzip。我试图按照这个教程让我开始:https://www.mariokandut.com/transform-data-etl-pipeline-node-js/有一件事让我很困扰,那就是如何在每个顺序步骤之后循环遍历文件。我在提取步骤中得到一个意外错误SyntaxError: Unexpected end of JSON input
。
我能够在一个单独的示例中进行提取、转换和保存。尽管我无法成功地将其组合到这个ETL管道脚本中。
const fs = require('fs');
const {promises: {readdir, readFile, writeFile}} = require("fs");
var url = require('url');
const zlib = require('zlib');
const input_dir = __dirname + '/input'
const input_unzipped_dir = __dirname + '/input-unzipped'
const output_dir = __dirname + '/output'
async function get_files(dir) {
return await readdir(dir).then(response =>
response
);
}
function read_file(file_path, callback) {
fs.readFile(file_path, 'utf-8', (err, file_data) => {
if (err) {
return callback && callback(err);
}
try {
const object = JSON.parse(file_data);
return callback && callback(null, object);
} catch (err) {
return callback && callback(err);
}
})
}
function transform_JSON(file_data) {
console.log("ts is:", file_data.ts); // => "timestamp"
console.log("u is:", file_data.u); // => "url"
console.log("e is:", file_data.e); // => "event"
console.log(url.parse(file_data.u))
u = url.parse(file_data.u)
const query_map = new Map(Object.entries(file_data.e));
const output = {
timestamp: file_data.ts,
url_object: {
domain: u.host,
path: u.path,
query_object: query_map,
hash: u.hash,
},
ec: file_data.e,
}
const jsonString = JSON.stringify(output)
console.log(jsonString)
return jsonString
}
const orchestrate_etl_pipeline = async () => {
try {
// extract
files = await get_files(input_dir);
console.log(files);
if (!fs.existsSync(input_unzipped_dir)){
fs.mkdirSync(input_unzipped_dir);
}
Promise.all(files.map(filename => {
if (filename.endsWith('.gz')) {
return new Promise((resolve, reject) => {
const fileContents = fs.createReadStream(`${input_dir}/${filename}`);
const writeStream = fs.createWriteStream(`${input_unzipped_dir}/${filename.slice(0, -3)}`);
const unzip = zlib.createGunzip();
fileContents.pipe(unzip).pipe(writeStream).on('finish', (err) => {
if (err) return reject(err);
else resolve();
})
})
}
}))
.then(
console.log('unzip done')
);
// transform
files_unzipped = await get_files(input_unzipped_dir);
Promise.all(files_unzipped.map(filename => {
if (filename.endsWith('.json')) {
read_file(`${input_unzipped_dir}/${filename}`, (err, file_data) => {
if (err) {
console.error(err);
return
}
transform_JSON = transform_JSON(file_data)
console.log(transform_JSON)
})
}
}))
.then(
console.log('transform done')
);
// save file
// gunzip file
} catch (error) {
console.log(error);
}
}
orchestrate_etl_pipeline().then(console.log('etl done'));
单独转换和保存文件示例:
function jsonReader(file_path, callback) {
fs.readFile(file_path, (err, file_data) => {
if (err) {
return callback && callback(err);
}
try {
const object = JSON.parse(file_data);
return callback && callback(null, object);
} catch (err) {
return callback && callback(err);
}
});
}
jsonReader(`${input_zipped_dir}/t1669976028340.json`, (err, input) => {
if (err) {
console.log(err);
return;
}
console.log("ts is:", input.ts); // => "ts"
console.log("u is:", input.u); // => "u"
console.log("e is:", input.e); // => "e"
console.log(url.parse(input.u))
u = url.parse(input.u)
const query_map = new Map(Object.entries(input.e));
const output = {
timestamp: input.ts,
url_object: {
domain: u.host,
path: u.path,
query_object: query_map,
hash: u.hash,
},
ec: input.e,
}
jsonString = JSON.stringify(output)
console.log(jsonString)
fs.writeFile(`${input_zipped_dir}/t1669976028340.json`, jsonString, err => {
if (err) {
console.log('Error writing file', err)
} else {
console.log('Successfully wrote file')
}
})
})
1条答案
按热度按时间enyaitl31#
我推荐你使用etl-gun来创建ETL管道。它支持rxjs范式,使管道非常可视化。它有json和filesystem的端点。