hadoop中inputstream的过早eof

fhity93d  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(268)

我想在hadoop中逐块(不是逐行)读取大文件,其中每个块的大小接近5 mb。为此我写了一个习惯 recordreader . 但这给了我一个错误 Premature EOF from inputStream ,这是由 nextKeyValue() , readfully() ,同时阅读。
这是我的密码:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<Text, apriori> {

public Text key = new Text("");
public apriori value = new apriori();
public Configuration job;
public FileSplit filesplit;
public FSDataInputStream in;
public Boolean processed = false;
public int len = 5000000;
public long filepointer = 0;
public int mapperFlag = 0;

public WholeFileRecordReader(FileSplit arg0, TaskAttemptContext arg1) {
    this.filesplit = arg0;
    this.job=arg1.getConfiguration();
}

@Override
public void close() throws IOException {

}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
}

@Override
public apriori getCurrentValue() throws IOException, InterruptedException {
        return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return processed ? 1.0f : 0.0f;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
        throws IOException, InterruptedException {
    this.job = arg1.getConfiguration();
    this.filesplit = (FileSplit)arg0;
    final Path file = filesplit.getPath();

    FileSystem fs = file.getFileSystem(job);
    in = fs.open(file);
    }

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if ((!processed)&&(filesplit.getLength()>filepointer)) {
        byte[] contents = new byte[ len];
        Path file = filesplit.getPath();
        key.set(file.getName());
        in.seek(filepointer);
        try {
            IOUtils.readFully(in, contents, 0, len);
            value.set(contents, 0, len);
                        } finally {
    //        IOUtils.closeStream(in);
        }
        filepointer = filepointer + len;
        processed = false;
        return true;
    }
    else if((!processed)&&(filesplit.getLength()<filepointer))
    {
        Path file = filesplit.getPath();
        key.set(file.getName());
        int last = (int)(filesplit.getLength()-(filepointer-len));
        byte[] contents = new byte[last];
        in.seek(filepointer-len);
        try {
            IOUtils.readFully(in, contents, 0, last);
            mapperFlag =1;
            value.set(contents, 0, last,mapperFlag);

        } finally {
            IOUtils.closeStream(in);
        }
        processed = true;
        return true;
    }

    return false;
 }
 }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题