我试图用node.js从pile语料库中正确读写100万行,问题是我得到了一个有100万行的输出文件,但第100万行是原始文件的900 kish行。
这是我使用的:
fs =require("fs");
var buffer = '';
var rs1 = fs.createReadStream('00.jsonl',"ascii");
var ws = fs.createWriteStream('pile_1M.txt', {flags: 'w',encoding: 'ascii'});
var lines_num=0;
var max_read=1000000;
rs1.on('data', function(chunk) {
rs1.pause()
var lines = (buffer+chunk).split(/\r?\n/g);
for (var i = 0; i < lines.length && lines_num<max_read; ++i) {
lines_num++;
ws.write(lines[i]+"\n");}
}
if(lines_num>=max_read){
rs1.close()
console.log("close")
}
rs1.resume();
});
rs1.on('close', function () {
ws.close();
});
编辑:
感谢@jfriend00,我提出了以下工作解决方案:
fs =require("fs");
var buffer = '';
var rs1 = fs.createReadStream('/media/user/hdd2/pile/00.jsonl',"utf-8");
var ws = fs.createWriteStream('pile_1M_2.txt', {flags: 'w',encoding: 'utf-8'});
var lines_num=0;
var max_read=100000;
function json_line_to_text(line){
var index=line.indexOf(', "meta": {"');
var res=line.substring(10,index-1);
if(index==-1){
res=line;
}
return res;
}
rs1.on('data', function(chunk) {
rs1.pause();
var lines = (buffer+chunk).split(/\r?\n/g);
for (var i = 0; i < lines.length && lines_num < max_read; ++i) {
if (i == lines.length - 1) {
buffer = lines[i];
} else {
if (lines_num < max_read) {
ws.write(json_line_to_text(lines[i]) + "\n");
lines_num++;
}
}
}
if(lines_num>=max_read){
rs1.close()
console.log("close")
}
rs1.resume();
});
rs1.on('close', function () {
ws.close();
});
**我包含了jsonline_to_text函数,我最初认为这是问题的真正原因,但事实并非如此。
1条答案
按热度按时间ia2d9nvy1#
您的读取不一定是整行,但您的代码似乎假定它们是整行,它们很可能不会在一行的末尾结束。
所以,你这样做:
然后你把它们算作整行,但是分割中的最后一行不一定是整行。
然后你这样做:
在每一个分割块的末尾添加一个
\n
,但是最后一个分割块可能不是一行的末尾。所以,你最终会在原来没有换行符的地方将块分割成新的一行。通常的处理方法是保存分割的最后一部分,不把它算作一行,也不把它写出来。保存它作为下一次读取的前缀。这样你就只在真正有行边界的地方写出来一个
\n
。而且,如果行边界是
\r\n
,但readstream只读取\r
,那么你可能也会遇到拆分问题。那么,你就没有正确处理这个问题。这个问题也可以通过不处理拆分的最后一部分并将其保存以与下一次读取组合来解决。然后,你知道你总是在处理它之前结束整行。另外,在这段代码中,我没有看到你重置
buffer
来排除你已经处理和写出的数据。事实上,这段代码似乎从来没有在buffer
变量中放入任何行。你应该像前面描述的那样使用它来包含最后一个部分行。在一个单独的主题中,您还应该观察以下方面的流控制:
.write()
返回一个布尔值,告诉你缓冲区是否已满,你应该在进一步写入之前等待drain
事件。这在文档中有详细说明。