我试着用map/reducer做不同的工作人员,而不是以前我做的。
我现在有一个文件输入如下:
1 50000 2015 pc technology
2 15424 1998 mouse technology
3 78420 2010 pen technology
4 8452 2000 pen stationery
5 4125 2000 pen stationery
id、价格、年份、项目、类型
我要做的是计算一种特定商品的平均价格,每种商品的平均价格,以及每一年的平均价格。所以,举个例子,我开始为钢笔做这些东西。2000年钢笔的平均价格是多少?在我的示例中,有两种笔(pc的数字笔和标准笔),因此我希望有如下输出:
pen stationery 6288 2000
pen technology 78420 2010
我的问题是我不知道怎么做。。。我知道如何使用合并器/减速机计算平均值,但我不知道如何记录年份和项目。。。我的逻辑是这样的:检查物品是否是笔,然后记录年份和笔的价格,用这些数据计算平均值。但我不知道怎么做。任何帮助都非常感谢,谢谢
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Draft2 {
public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{
private Text word = new Text();
private Text word2 = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
String price = "";
String item = "";
String typ = "";
String year = "";
if(tokens.length >= 2)
price = tokens[1];
if(tokens.length >= 3)
year = tokens[2];
if (tokens.length >=5)
typ = tokens[4];
if (tokens.length >=14)
item = tokens[13];
if(!item.isEmpty())
{
word.set(item + "|" + year);
word2.set(price + "|" + typ);
context.write(word, word2);
}
}
}
public static class MeanCombiner extends Reducer <Text,Text,Text,Text> {
private Text word = new Text();
private Text word2 = new Text();
public void combine(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int sum = 0;
int counter = 0;
final Iterator<Text> itr = values.iterator();
String[] strs = key.toString().split("|");
final String[] tokens = values.toString().split("|");
String item = strs[0];
String typ = tokens[1];
String type = "pen";
String year = strs[1];
Boolean found = false;
for (Text val: values) {
String[] strs = key.toString().split("|");
final String[] tokens = values.toString().split("|");
String item= strs[0];
String typ = tokens[1];
String type = "pen";
String year = strs[1];
if (typ.equals(type)) {
found = true;
//don't know how to go on
}
/*this part is for the average but with wrong data*/
while (itr.hasNext()) {
if (typ.equals(type)) {
final String price = tokens[0];
final int value = Integer.parseInt(price);
counter++;
sum += value;
}
}
final int average = sum/counter;
String avg = Integer.toString(average);
String count = Integer.toString(counter);
word.set(key.toString());
word2.set(avg + "|" + count);
context.write(word, word2);
}
}
/*The reducer is incomplete*/
public static class MeanReducer extends Reducer<Text,Text,Text,Text> {
private Text word = new Text();
private Text word2 = new Text();
public void reduce(Text key, Iterable<Text> values, Context context
) throws IOException, InterruptedException {
word.set(k);
word2.set();
context.write(word, word2);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Draft2");
job.setJarByClass(Draft2.class);
job.setCombinerClass(MeanCombiner.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(MeanReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1条答案
按热度按时间p5cysglq1#
计算平均值时不应使用合并器。
假设以下是您的输入,我将解释解决方案:
Map器:
通过在空白处拆分(“”)来分析每条记录
对于每条记录,将(key,value)作为(year | item | type,price)发出。组合(year | item | type)将为每个记录提供一个唯一的键。例如,对于记录“5 4125 2000 pen文具”,您将发出键:“2000 | pen |文具”,值为:4125。
因此Map器的输出将是:
减速器:
对于每个键,先计算和,然后计算平均值。
发射平均值以及其他细节
例如,以下键将连接到同一减速器:
输出为: