mapreduce程序

7kqas0il  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(504)

我创建了一个mapreduce程序来获取世界指标数据,以显示我要分析的特定指标的结果(i、 e.二氧化碳排放)。数据以长线排列,包括国家、代码、指标、第一年排放量、第二年排放量等。在我的Map绘制程序中,我试图只保留我想要的数据(首先,如果有具体指标,只保留该行),然后保留国家和所有排放水平(以字符串数组形式)。
我的整个程序都在运行,但我注意到它正在接收Map输入记录,但没有Map输出记录或reduce输入/输出记录。
我一直想弄清楚我的逻辑哪里出错了,但我被难住了。任何意见都将不胜感激。
我的代码如下:
---Map器--

package org.myorg;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CO2Mapper extends Mapper <LongWritable, Text, Text, IntWritable>
{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String delims = ",";
        String splitString = value.toString();

        String[] tokens = splitString.split(delims);

        int tokenCount = tokens.length;
        String country = tokens[1]; 
        String indicator = tokens[3];
        int levels;

        if(indicator.equals("EN.ATM.CO2E.KT"))
        {   
            for (int j = 4; j < tokenCount; j++)
            {
                levels = Integer.parseInt(tokens[j]);
                context.write(new Text(country), new IntWritable(levels));
            }
        }
    } 
}

----减速机---

package org.myorg;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class CO2Reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
    {
        int maxValue = Integer.MIN_VALUE;
        int minValue = Integer.MAX_VALUE;
        for(IntWritable val : values)
        {
            maxValue = Math.max(maxValue, val.get());
            minValue = Math.min(minValue, val.get());
        }

        context.write(key, new IntWritable(maxValue));
        context.write(key, new IntWritable(minValue));
    }
}

---主要---

package org.myorg;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
//import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

public class CO2Levels 
{

    public static void main(String[] args) throws Exception  
    {    
        //with mapreduce

        Configuration conf = new Configuration();
        Job job = new Job(conf, "co2Levels");

        //Job job = new Job();

        job.setJarByClass(CO2Levels.class);
        //job.setJobName("co2Levels");
        job.setMapperClass(CO2Mapper.class);
        job.setReducerClass(CO2Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setInputFormatClass(TextInputFormat.class);
        //job.setInputFormatClass(KeyValueTextInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}
iibxawm4

iibxawm41#

从示例输入中,我发现令牌的格式是6.16e+03,它引发了一个异常,不能解析为整数。
另外,如果您想检查system.out.println()的位置,请检查这个

omtl5h9j

omtl5h9j2#

在你的主要工作中,你没有导入你的Map和减少类。将以下内容添加到main:

import org.myorg.CO2Mapper;
import org.myorg.CO2Reducer;
bqujaahr

bqujaahr3#

在分析了示例输入之后,似乎找到了问题的原因。中的以下代码块 Mapper 输入错误:

for (int j = 4; j < tokenCount; j++){
      levels = Integer.parseInt(tokens[j]);

从第5列开始,所有数值都是浮点表示(例如:“8.44e+03”),尽管它们确实是整数。因此 Integer.parseInt 正在投掷 NumberFormatException 工作失败了。我不相信“我的整个程序运行”语句(查看jobtracker上的任务日志)。如果确定输入将始终包含整数,请执行以下操作:

levels = (int) Float.parseFloat(tokens[j]);

否则更改的数据类型 levels 对map的输出值类使用floatwritable/doublewritable,并对reducer进行相关更改。
输入的另一个问题是存在空字段,这也会产生 NumberFormatException 在解析过程中。添加一些检查,如:

if (tokens[j] != null || tokens.trim().isEmpty()){
         continue; // or do the needful. eg - set levels to 0 or some default value 
  }

希望这能解决问题。但是我不明白你用在减速机上的逻辑。这可能是有意的,但似乎你的变量 maxValue & minValue 总是以 Integer.MAX_VALUE & Integer.MIN_VALUE 由于比较:

maxValue = Math.max(maxValue, val.get());
 minValue = Math.min(minValue, val.get());

这意味着上面的陈述没有用,或者我没抓住重点。不管怎样祝你好运。

相关问题