我有一个100万条记录的文件,其中我必须通过一个一个记录ElasticSearch,并将结果数据保存到数据库中。但问题是,这样做需要很长时间,因为记录一个接一个地流到elasticsearch,然后它将数据保存到psql数据库中。我想要一些建议,我该如何改进这个或应该使用一些其他工具。
现在我将nodejs与一些包一起使用:
我在nodejs应用程序中上传文件,并使用 const csv=require('csvtojson')
我用
const StreamArray = require('stream-json/streamers/StreamArray');
const {Writable} = require('stream');
用于读取json并通过这些包使用stream解析它,因为文件太大。我用这个密码
const fileStream = fs.createReadStream(this.fileName);
const jsonStream = StreamArray.withParser();
const incomingThis = this;
const processingStream = new Writable({
write({key, value}, encoding, callback) {
incomingThis.recordParser(value, (val, data) => { // pass the data to elasticsearch to get search data
incomingThis.processQueue(data); // save the data to the PSQL database
callback();
});
},
//Don't skip this, as we need to operate with objects, not buffers
objectMode: true
});
//Pipe the streams as follows
fileStream.pipe(jsonStream.input);
jsonStream.pipe(processingStream);
//So we're waiting for the 'finish' event when everything is done.
processingStream.on('finish', async () => {
console.log('stream end');
const statistics = new Statistics(jobId);
await statistics.update(); // update the job table for completion of data
});
请建议我如何改进这一点,以便在几个小时内解析100万条记录文件,而不是几天或更短的时间。我也愿意使用任何其他工具,比如redis,spark,如果这些工具对我有帮助的话。
谢谢。
1条答案
按热度按时间kx5bkwkv1#
而不是一个接一个地从溪流中挤出来。使用批处理方法(创建多个批处理)以弹性方式获取数据并以批处理方式保存。