在mapreduce hadoop中将参数传递给记录读取器

yvfmudvl  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(409)

这是我使用variours arg的代码

import java.io.File;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.poi.hwpf.HWPFDocument;
import org.apache.poi.hwpf.extractor.WordExtractor;

public class Docsparser {
      private static String Delimiter;

    public static class DocsInputFormat extends FileInputFormat<Text, Text> {

          @Override
          public RecordReader<Text, Text> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException, InterruptedException {
              return new DocsLineRecordReader();
          }
    }

          public static  class DocsLineRecordReader extends RecordReader<Text, Text> {

              private Text key = new Text();
              private Text value = new Text();
              private int currentword = 0;
              private String fileline;
              private File file = null;
              private  String line; 
              private HWPFDocument document;
              private WordExtractor extractor = null;
              private String[] filedata;
              StringBuilder sb = new StringBuilder();

              @Override
              public void initialize(InputSplit split, TaskAttemptContext context)
                      throws IOException, InterruptedException {

                  FileSplit fileSplit = (FileSplit) split;
                  final Path file = fileSplit.getPath();
                  Configuration conf = context.getConfiguration();
                    FileSystem fs = file.getFileSystem(conf);
                    FSDataInputStream filein = fs.open(fileSplit.getPath());

                    String Delim = conf.get("Delim");
                      if (filein != null)
                      {
                          HWPFDocument document = new HWPFDocument(filein);
                          extractor = new WordExtractor(document);
                          fileline = extractor.getText();

                          filedata = fileline.split(Delim);
                      }
                    }

              @Override
              public boolean nextKeyValue() throws IOException, InterruptedException
              {

                  if (key == null) {
                      key = new Text();
                  }

                  if (value == null) {
                      value = new Text();
                  } 
                  if(currentword < filedata.length)
                  {
                      for ( currentword=0;currentword < filedata.length; currentword++)
                      {               
                          sb.append(filedata[currentword] +",");
                          line = sb.toString();               

                      }

                      key.set(line);
                      value.set("");
                      return true;
                  }
                  else
                  {
                      key = null;
                      value = null;
                        return false;
                  }

              }

              @Override
              public Text getCurrentKey() throws IOException, InterruptedException {
                  return key;
              }

              @Override
              public Text getCurrentValue() throws IOException, InterruptedException {
                  return value;
              }

              @Override
              public float getProgress() throws IOException, InterruptedException {
                  return (100.0f / filedata.length * currentword) / 100.0f;
              }

             @Override
              public void close() throws IOException {

                }
             }

    public static class Map extends Mapper<Text, Text, Text, Text>{

        public void map(Text key, Text value, Context context) throws IOException, InterruptedException
        {

                    context.write(key,value);

         }
    }

        public static void main(String[] args) throws Exception
        {

                Configuration conf = new Configuration();
                Job job = new Job(conf, "Docsparser");
                job.setJarByClass(Docsparser.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                job.setMapperClass(Map.class);
                job.setNumReduceTasks(0);

                FileInputFormat.setInputPaths(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                Delimiter = args[2].toString();
                conf.set("Delim",Delimiter);

                job.setInputFormatClass(DocsInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);

                System.exit(job.waitForCompletion(true) ? 0 : 1);

        }

}

异常详细信息:
15/09/28 03:50:04 info mapreduce.job:任务id:尝试\u 1443193152998 \u 2319 \u m \u000000 \u 2,状态:失败错误:java.lang.nullpointerexception at java.lang.string.split(string。java:2272)在java.lang.string.split(string。java:2355)在com.nielsen.grfe.docsparser$docslinerecordreader.initialize(docsparser。java:66)在org.apache.hadoop.mapred.maptask$newtrackingrecordreader.initialize(maptask。java:548)在org.apache.hadoop.mapred.maptask.runnewmapper(maptask。java:786)在org.apache.hadoop.mapred.maptask.run(maptask。java:341)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:163)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:415)在org.apache.hadoop.security.usergroupinformation.doas(用户组信息。java:1671)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:158)

js81xvg6

js81xvg61#

在初始化作业类之前,必须设置所有配置变量。移动

Delimiter = args[2].toString(); 
    conf.set("Delim",Delimiter);

之前

Job job = new Job(conf, "Docsparser");
e5nszbig

e5nszbig2#

这个 NullPointerException 发生在 split 方法 fileline 字符串。我怀疑你还没有设定 "Delim" 配置值和变量 Delimnull .

相关问题