从一个文件中加载一百万条记录并保存到psql数据库

7gs2gvoe  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(358)

我有一个100万条记录的文件,其中我必须通过一个一个记录ElasticSearch,并将结果数据保存到数据库中。但问题是,这样做需要很长时间,因为记录一个接一个地流到elasticsearch,然后它将数据保存到psql数据库中。我想要一些建议,我该如何改进这个或应该使用一些其他工具。
现在我将nodejs与一些包一起使用:
我在nodejs应用程序中上传文件,并使用 const csv=require('csvtojson') 我用

const StreamArray = require('stream-json/streamers/StreamArray');
const {Writable} = require('stream');

用于读取json并通过这些包使用stream解析它,因为文件太大。我用这个密码

const fileStream = fs.createReadStream(this.fileName);
            const jsonStream = StreamArray.withParser();
            const incomingThis = this;
            const processingStream = new Writable({
                write({key, value}, encoding, callback) {
                    incomingThis.recordParser(value, (val, data) => { // pass the data to elasticsearch to get search data
                        incomingThis.processQueue(data); // save the data to the PSQL database
                        callback();
                    });
                },
                //Don't skip this, as we need to operate with objects, not buffers
                objectMode: true
            });
            //Pipe the streams as follows
            fileStream.pipe(jsonStream.input);
            jsonStream.pipe(processingStream);
            //So we're waiting for the 'finish' event when everything is done.
            processingStream.on('finish', async () => {
                console.log('stream end');
                const statistics = new Statistics(jobId);
                await statistics.update(); // update the job table for completion of data
            });

请建议我如何改进这一点,以便在几个小时内解析100万条记录文件,而不是几天或更短的时间。我也愿意使用任何其他工具,比如redis,spark,如果这些工具对我有帮助的话。
谢谢。

kx5bkwkv

kx5bkwkv1#

而不是一个接一个地从溪流中挤出来。使用批处理方法(创建多个批处理)以弹性方式获取数据并以批处理方式保存。

相关问题