我有两个数据集。两者都在下面给出
第一个数据集
1 A
2 B
3 C
4 D
5 E
第二个数据集
1 ALPHA
2 BRAVO
3 CHARLIE
4 DELTA
5 ECHO
我想使用reduce-side-join连接这个数据集
最终的数据应该是这样的
A ALPHA
B BRAVO
C CHARLIE
D DELTA
E ECHO
我写了以下代码
Map器(从第一个数据集提取数据)
public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"\t"+tokens[1].trim()));
}
}
Map器(从第二个数据集提取数据)
public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"\t"+tokens[1].trim()));
}
}
减速器(根据需要连接数据)
public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
private String output1=new String();
private String output2=new String();
private TreeMap<String,String> x1=new TreeMap<String,String>();
private String tokens[];
public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
if(tokens[0].contains("m1"))
{
output1=tokens[1];
}else if(tokens[0].contains("m2"))
{
output2=(tokens[1]);
}
x1.put(output2, output1);
cleanup(context);
}
public void cleanup(Context context)throws IOException,InterruptedException{
for(Entry y:x1.entrySet())
{
context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
}
}
}
在driver类中,包含以下行
MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);
我得到的输出与下面给出的一样,完全不符合要求。
1 m1 A
1 m2 ALPHA
2 m2 BRAVO
2 m1 B
3 m1 C
3 m2 CHARLIE
4 m2 DELTA
4 m1 D
5 m1 E
5 m2 ECHO
我完全搞不懂为什么索引会被打印出来,即使我还没有把索引包含在 context.write()
我甚至使用了cleanup(),仍然得到相同的结果。请建议如何获得所需的结果,如上所述。
最衷心的感谢让我摆脱困境的人:)
稍后经过一些修改,我得到了这个输出
m1 E
m1 D
m1 C
m1 B
m1 A
m2 ECHO
m2 DELTA
m2 CHARLIE
m2 BRAVO
m2 ALPHA
2条答案
按热度按时间7d7tgy0s1#
修改后的代码可能如下所示
oprakyz72#
reducer方法应该将key和iterable值作为参数。每个减速机的数据格式如下
{1,{“m1a”,“m2 alpha”},{1,{“m2 ba”,“m2 bravo”}。
请重新检查减速机方法的签名。我假设一旦这个问题解决了,如果你的数据是一对一的,你可以相应地Map。如果是一对多,那么您可能有多个m1或m2,为此,您需要决定如何管理多个值(map可以保持逗号分隔,或者在json或xml字符串中),然后输出最终值。