使用map reduce连接多个文件

zzlelutf  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(306)

连接2个文件的内容:
第一个文件(包含员工姓名数据)

id,name
101,Gaurav
102,Rohit
103,Karishma
104,Darshan
105,Divya

第二个文件(包含员工部门数据)

id,dept
101,Sales
102,Research
103,NMG
104,Admin
105,HR

==========================
输出

id,name,dept
101,Gaurav,Sales
102,Rohit,Research
103,Karishma,NMG

如何获得这种输出?
现在我把它作为随机值放在减速机里,比如。。
我想在指定的顺序,如id,名称,部门输出。谢谢你的帮助。
Map器类如下所示。。。

public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text keyEmit = new Text();
private Text valEmit = new Text();
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
 String line=value.toString();
 String[] words=line.split(",");
 keyEmit.set(words[0]);
 valEmit.set(words[1]);
 context.write(keyEmit, valEmit);
}
}

减速机类看起来像这样。。。

public class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    merge = key.toString(); // 101
    for(Text value : values) {
        merge +=  "," + value.toString();
    }
    context.write(NullWritable.get(), new Text(merge));
}
}

驱动程序类看起来像这样。。。

public class JoinDriver {
public final static void main(final String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Multiple join");

    job.setJarByClass(JoinDriver.class);
    // job.setMapperClass(JoinMapper.class);
    job.setReducerClass(JoinReducer.class);

    MultipleInputs.addInputPath(job, new Path(args[0]),
            TextInputFormat.class, JoinMapper.class);

    MultipleInputs.addInputPath(job, new Path(args[1]),
            TextInputFormat.class, JoinMapper.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileOutputFormat.setOutputPath(job, new Path(args[2]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

现在的输出如下,我希望它像id,name,department那样按顺序排列。

polhcujo

polhcujo1#

您的主要问题是值没有排序,因此您在一个公共键上分组,但是仅仅将值作为字符串发送并没有很大帮助,因为您不知道哪个是名称,哪个是部门。
您有几个选项,所有这些选项都需要从Map器发送更多信息:
使用辅助排序
对减速器中的值进行排序
最快的方法是在Map器中输出值时向该值附加更多信息(理想情况下,实际使用包含两个文本对象的复合值)。

public class JoinMapperName extends Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable k, Text value, Context context) 
                          throws IOException, InterruptedException {

        String[] words = value.toString().split(",");
        context.write(new Text(words[0]), new Text("name:" + words[1]));
    }
}

public class JoinMapperDept extends Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable k, Text value, Context context) 
                          throws IOException, InterruptedException {

        String[] words = value.toString().split(",");
        context.write(new Text(words[0]), new Text("dept:" + words[1]));
    }
}

所以现在每个数据源都有一个不同的Map器。你需要把减速机改成:

public class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
    public void reduce(Text key, Iterable<Text> values, Context context) 
              throws IOException, InterruptedException {

        String name = "";
        String dept = "";
        for(Text value : values) {
            if (value.toString().startsWith("name")) {
                name = value.toString().split(":")[1];
            } else {
                dept = value.toString().split(":")[1];
            }
        }
        String merge = key + "," name + "," + dept;
        context.write(NullWritable.get(), new Text(merge));
    }
}

这只是一个简单的例子。希望它能给你一些关于如何执行命令的想法。

相关问题