hadoop,mapreduce自定义java计数器线程“main”java.lang中出现异常illegalstateexception:作业处于定义状态而不是运行状态

q8l4jmvw  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(304)

错误是:

Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
    at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:294)
    at org.apache.hadoop.mapreduce.Job.getCounters(Job.java:762)
    at com.aamend.hadoop.MapReduce.CountryIncomeConf.main(CountryIncomeConf.java:41)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

错误表明问题出在线路上:
counter counter=job.getcounters().findcounter(counters.missing\字段\记录\计数);
我也有一个枚举与名称计数器。

Map器:

import java.io.IOException;

        import org.apache.hadoop.io.DoubleWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Mapper;
        import org.apache.log4j.Logger;

  public class CountryIncomeMapper extends Mapper<Object, Text, Text, DoubleWritable> {

      private Logger logger = Logger.getLogger("FilterMapper");

      private final int incomeIndex = 54;
      private final int countryIndex = 0;
      private final int lenIndex = 58;

      String seperator = ",";

      public void map(Object key, Text line, Context context) throws IOException,
          InterruptedException {

        if (line == null) {
          logger.info("null found.");
          context.getCounter(COUNTERS.ERROR_COUNT).increment(1);
          return;
        }
        if (line.toString().contains(
            "Adjusted net national income per capita (current US$)")) {
          String[] recordSplits = line.toString().split(seperator);

          logger.info("The data has been splitted.");

          if (recordSplits.length == lenIndex) {

            String countryName = recordSplits[countryIndex];
            try {

              double income = Double.parseDouble(recordSplits[incomeIndex]);

              context.write(new Text(countryName), new DoubleWritable(income));

            } catch (NumberFormatException nfe) {

              logger.info("The value of income is in wrong format." + countryName);
              context.getCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT).increment(1);
              return;
            }

          }
        }
      }
    }

驾驶员等级:

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class CountryIncomeConf {

  public static void main(String[] args) throws IOException,
      InterruptedException, ClassNotFoundException {

    Path inputPath = new Path(args[0]);
    Path outputDir = new Path(args[1]);

    // Create configuration
    Configuration conf = new Configuration(true);

    // Create job
    Job job = new Job(conf, "CountryIncomeConf");
    job.setJarByClass(CountryIncomeConf.class);

     Counter counter =
     job.getCounters().findCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT);

     System.out.println("Error Counter = " + counter.getValue());

    // Setup MapReduce
    job.setMapperClass(CountryIncomeMapper.class);
    job.setNumReduceTasks(1);

    // Specify key / value
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);

    // Input
    FileInputFormat.addInputPath(job, inputPath);
    job.setInputFormatClass(TextInputFormat.class);

    // Output
    FileOutputFormat.setOutputPath(job, outputDir);
    job.setOutputFormatClass(TextOutputFormat.class);

    // Delete output if exists
    FileSystem hdfs = FileSystem.get(conf);
    if (hdfs.exists(outputDir))
      hdfs.delete(outputDir, true);

    // Execute job
    int code = job.waitForCompletion(true) ? 0 : 1;
    System.exit(code);

  }

}
tgabmvqs

tgabmvqs1#

看来你是想在提交作业之前拿到柜台。

相关问题