我有一个很大的tsv文件,有以下输入:
Site1 Tag1
Site1 Tag34
Site1 Tag8
Site2 Tag75
Site2 Tag54
Site2 Tag8
Site3 Tag24
Site3 Tag34
Site3 Tag1
...
我想在hadoopmapreduce的帮助下,找到输入中所有站点之间的相似站点对,以及每个站点对中相似标记的数量。
部分输入输出:
Site1 Site2 1 // Site1 is similar to Site2 with 1 tag (Tag8)
Site1 Site3 2 // Site1 is similar to Site3 with 2 tag (Tag1 and Tag34)
Site2 Site1 1
Site3 Site1 2
我想输出每个网站只有10个最相似的网站。
每个站点有3个标签
我想使用2个mapreduce作业:
Map标签(键)和站点并按标签缩小,在缩小阶段将所有站点作为特定标签并写入输出“tag sitex sitey”
第二个mapreduce作业将接受第一个输入,并将按sitex、sitey对执行分组,以获取相似站点对中相似标记的数量。
我试图实现第一个mapred,但我得到的只是“tag,site”输出。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RawToSimilarTagMapper {
public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> {
private Text site = new Text();
private Text tag = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String [] siteTag = value.toString().split("\t");
site.set(siteTag[0]);
tag.set(siteTag[1]);
context.write(tag, site);
System.out.println();
}
}
public static class SimilarSiteReducer extends Reducer<Text, Text, Text, Text> {
private Text value = new Text();
public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, InterruptedException {
for (Text text : values) {
for (Text text2 : values) {
if (!text.equals(text2)) {
value.set(text.toString() + "\t" + text2.toString());
output.collect(key, value);
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "raw-to-similar");
job.setJarByClass(RawToSimilarTagMapper.class);
job.setMapperClass(TagToSiteMapper.class);
job.setCombinerClass(SimilarSiteReducer.class);
job.setReducerClass(SimilarSiteReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
FileSystem fs = null;
Path dstFilePath = new Path(args[2]);
try {
fs = dstFilePath.getFileSystem(conf);
if (fs.exists(dstFilePath))
fs.delete(dstFilePath, true);
} catch (IOException e1) {
e1.printStackTrace();
}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我做错什么了?
在下一个阶段,我如何才能得到每个网站的前10个最相似的网站?
2条答案
按热度按时间yc0p9oo01#
我会这样做的。此外,您还可以通过在第二个作业的输出上编写第三个作业来进行排序,以获得前十个站点。(提示:您只需要编写Map器)注意:这适用于所提供的示例数据。您可能需要对格式错误的数据进行初始清理。
最终输出:
代码:
控制台标准输出:
wnrlj8wa2#
看来你的合路器出了问题。mapper&combiner的输出格式必须相同,这在您的情况下是不正确的。您是否可以将combiner注解为仅用于性能优化,并运行相同的程序。