用例:
文件1包含包含trackerid+其他字段的impression数据
文件2包含click details包含trackerid+clicked
我使用上述两个和一个减速器不同的Map器,但似乎减速器是不能结合这两个文件和数据。
package com.hadoop.intellipaat;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.google.common.collect.Lists;
/**
* This job will combine click and impression on TrackerId
*
* @author raghunandangupta
*
*/
public class JoinClickImpressionDetailJob {
public static final String IMPRESSION_PREFIX = "IMPRESSION_PREFIX";
public static final String CLICK_PREFIX = "CLICK_PREFIX";
public static final String SEPERATOR = "~";
private static class ImpressionMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
/**
* Excluding header
*/
if (!(value.toString().indexOf("accountId") != -1)) {
String words[] = value.toString().split(",");
if (words.length > 18) {
context.write(new Text(words[18].trim()), new Text(IMPRESSION_PREFIX + SEPERATOR + value.toString()));
}
} else {
context.write(new Text(""), value);
}
}
}
private static class ClickMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String words[] = value.toString().split(",");
if (words.length > 18) {
context.write(new Text(words[18].trim()), new Text(CLICK_PREFIX + SEPERATOR + value.toString()));
} else {
context.write(new Text(""), new Text("1"));
}
}
}
private static class ImpressionClickReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) {
try {
System.out.println("=========="+key.toString());
if (key.toString().length() != 0) {
List<Text> myList = Lists.newArrayList(values);
for(Text t : myList){
System.out.println("#######"+t.toString());
}
System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@");
if (myList.size() == 2) {
if (myList.get(0).toString().indexOf(IMPRESSION_PREFIX) != -1 && myList.get(1).toString().indexOf(CLICK_PREFIX) != -1) {
String line = myList.get(0).toString().split(SEPERATOR)[1] + ",1";
context.write(key, new Text(line));
} else if (myList.get(1).toString().indexOf(IMPRESSION_PREFIX) != -1
&& myList.get(0).toString().indexOf(CLICK_PREFIX) != -1) {
String line = myList.get(1).toString().split(SEPERATOR)[1] + ",1";
context.write(key, new Text(line));
}
}
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
}
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
// conf.set("mapreduce.output.fileoutputformat.compress", "true");
// conf.set("mapreduce.output.fileoutputformat.compress.codec",
// "org.apache.hadoop.io.compress.GzipCodec");
// conf.set("mapreduce.map.output.compress.codec",
// "org.apache.hadoop.io.compress.SnappyCodec");
// conf.set("mapreduce.output.fileoutputformat.compress.type",
// "BLOCK");
Job job = Job.getInstance(conf, "IMPRESSION_CLICK_COMBINE_JOB");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setReducerClass(ImpressionClickReducer.class);
FileInputFormat.setInputDirRecursive(job, true);
// FileInputFormat.addInputPath(job, new Path(args[0]));
// job.setMapperClass(ImpressionMapper.class);
/**
* Here directory of impressions will be present
*/
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
/**
* Here directory of clicks will be present
*/
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
任何线索都将不胜感激。
如。
File 1 [trackerId1,record1] File2 [treackerId1, Clicked]
在我看来: trackerId,[record1,record1]
理想情况下应该是这样 trackerId ,[record1,clicked]
1条答案
按热度按时间kgsdhlau1#
您的问题很可能与减速器中的这条线有关:
List<Text> myList = Lists.newArrayList(values);
要记住的是Iterable<Text> values
重复使用Text
对象,它在迭代时提供给您。所以你可能要加两个Text
对象,但它们指向同一对象。如果你看看
Lists.newArrayList()
如果可以的话,它只是将对象添加到数组中,而不创建新的对象。所以如果你要用
Text
每次向数组中添加值时都需要创建一个新对象的对象。这就是人们在这种情况下使用字符串的一个典型原因。如果要快速检查这是否是问题所在,请将此代码更改为:因此,您创建了一个新的
Text
每次。