我是hadoop新手,我正在尝试做一个mapreduce程序,按日期(按月份分组)统计最多前两次出现的选择器。所以我的意见是这样的:
2017-06-01 , A, B, A, C, B, E, F
2017-06-02 , Q, B, Q, F, K, E, F
2017-06-03 , A, B, A, R, T, E, E
2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G
所以,我把它作为mapreducer程序的结果,比如:
2017-06, A:4, E:4
2017-07, A:4, B:4
public class ArrayGiulioTest {
public static Logger logger = Logger.getLogger(ArrayGiulioTest.class);
public static class CustomMap extends Mapper<LongWritable, Text, Text, TextWritable> {
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
TextWritable array = new TextWritable();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, ",");
String dataAttuale = tokenizer.nextToken().substring(0,
line.lastIndexOf("-"));
Text tmp = null;
Text[] tmpArray = new Text[tokenizer.countTokens()];
int i = 0;
while (tokenizer.hasMoreTokens()) {
String prod = tokenizer.nextToken(",");
word.set(dataAttuale);
tmp = new Text(prod);
tmpArray[i] = tmp;
i++;
}
array.set(tmpArray);
context.write(word, array);
}
}
public static class CustomReduce extends Reducer<Text, TextWritable, Text, Text> {
public void reduce(Text key, Iterator<TextWritable> values,
Context context) throws IOException, InterruptedException {
MapWritable map = new MapWritable();
Text txt = new Text();
while (values.hasNext()) {
TextWritable array = values.next();
Text[] tmpArray = (Text[]) array.toArray();
for(Text t : tmpArray) {
if(map.get(t)!= null) {
IntWritable val = (IntWritable) map.get(t);
map.put(t, new IntWritable(val.get()+1));
} else {
map.put(t, new IntWritable(1));
}
}
}
Set<Writable> set = map.keySet();
StringBuffer str = new StringBuffer();
for(Writable k : set) {
str.append("key: " + k.toString() + " value: " + map.get(k) + "**");
}
txt.set(str.toString());
context.write(key, txt);
}
}
public static void main(String[] args) throws Exception {
long inizio = System.currentTimeMillis();
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "countProduct");
job.setJarByClass(ArrayGiulioTest.class);
job.setMapperClass(CustomMap.class);
//job.setCombinerClass(CustomReduce.class);
job.setReducerClass(CustomReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TextWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
long fine = System.currentTimeMillis();
logger.info("**************************************End" + (End-Start));
System.exit(1);
}
}
我用这种方式实现了我的自定义textwritable:
public class TextWritable extends ArrayWritable {
public TextWritable() {
super(Text.class);
}
}
…所以当我运行mapreduce程序时,我得到了这样的结果
2017-6 wordcount.TextWritable@3e960865
2017-6 wordcount.TextWritable@3e960865
很明显我的减速机坏了。好像是我的Map器的输出
你知道吗?有人会说,这条路是否正确?
这里是控制台日志(仅供参考,我的输入文件有6行而不是5行)*在eclipse(mono-jvm)下启动mapreduce problem或在hdfs中使用hadoop,我得到了相同的结果
File System Counters
FILE: Number of bytes read=1216
FILE: Number of bytes written=431465
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=6
Map output records=6
Map output bytes=214
Map output materialized bytes=232
Input split bytes=97
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=232
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=394264576
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=208
File Output Format Counters
Bytes Written=1813
2条答案
按热度按时间ryoqjall1#
主要问题是关于reduce方法的符号:
我在写:
public void reduce(Text key, Iterator<TextWritable> values, Context context)
而不是这就是为什么我获得map输出而不是reduce otuput
pgky5nke2#
我觉得你想在Map绘制器里做太多的工作了。您只需要对日期进行分组(根据预期的输出,您似乎没有正确格式化日期)。
例如,下面的方法将改变这些界限
换成这副做减速机
换句话说,使用
ArrayWritable
,将其保留为文本。所以,Map器看起来像这样
然后reducer可以构建一个Map,从值字符串中收集并计算值。同样,只写文本。
根据你的意见,我知道了