停留在reduce连接代码中

btxsgosb  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(290)

我有两个数据集。两者都在下面给出
第一个数据集

1   A
2   B
3   C
4   D
5   E

第二个数据集

1   ALPHA
2   BRAVO
3   CHARLIE
4   DELTA
5   ECHO

我想使用reduce-side-join连接这个数据集
最终的数据应该是这样的

A   ALPHA
B   BRAVO
C   CHARLIE
D   DELTA
E   ECHO

我写了以下代码
Map器(从第一个数据集提取数据)

public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
    private String tokens[];
    public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
        tokens=value.toString().split("\t");
        context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"\t"+tokens[1].trim()));
    }
}

Map器(从第二个数据集提取数据)

public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
        private String tokens[];
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split("\t");
            context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"\t"+tokens[1].trim()));
        }
}

减速器(根据需要连接数据)

public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
    private String output1=new String();
    private String output2=new String();
    private TreeMap<String,String> x1=new TreeMap<String,String>();
    private String tokens[];
    public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split("\t");
            if(tokens[0].contains("m1"))
            {
                output1=tokens[1];
            }else if(tokens[0].contains("m2"))
            {
                output2=(tokens[1]);
            }
            x1.put(output2, output1);
        cleanup(context);

}
    public void cleanup(Context context)throws IOException,InterruptedException{

        for(Entry y:x1.entrySet())
        {
            context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
        }
    }
}

在driver类中,包含以下行

MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);

我得到的输出与下面给出的一样,完全不符合要求。

1       m1      A
1       m2      ALPHA
2       m2      BRAVO
2       m1      B
3       m1      C
3       m2      CHARLIE
4       m2      DELTA
4       m1      D
5       m1      E
5       m2      ECHO

我完全搞不懂为什么索引会被打印出来,即使我还没有把索引包含在 context.write() 我甚至使用了cleanup(),仍然得到相同的结果。请建议如何获得所需的结果,如上所述。
最衷心的感谢让我摆脱困境的人:)
稍后经过一些修改,我得到了这个输出

m1      E
m1      D
m1      C
m1      B
m1      A
m2      ECHO
m2      DELTA
m2      CHARLIE
m2      BRAVO
m2      ALPHA
7d7tgy0s

7d7tgy0s1#

修改后的代码可能如下所示

public void reduce(IntWritable key,Iterabale<Text> values,Context context)throws IOException,InterruptedException{

               for(Text value : values) {
                tokens=values.toString().split("\t");
                if(tokens[0].contains("m1"))
                {
                    output1=tokens[1];
                }else if(tokens[0].contains("m2"))
                {
                    output2=(tokens[1]);
                }
                x1.put(output2, output1);
               }
            cleanup(context);

    }
oprakyz7

oprakyz72#

reducer方法应该将key和iterable值作为参数。每个减速机的数据格式如下
{1,{“m1a”,“m2 alpha”},{1,{“m2 ba”,“m2 bravo”}。
请重新检查减速机方法的签名。我假设一旦这个问题解决了,如果你的数据是一对一的,你可以相应地Map。如果是一对多,那么您可能有多个m1或m2,为此,您需要决定如何管理多个值(map可以保持逗号分隔,或者在json或xml字符串中),然后输出最终值。

相关问题