java使用hadoop“透视”表

dldeef67  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(332)

(免责声明:我对hadoop和java非常陌生)
作为输入,有一个具有简单键值结构的表:

key1  value1
key2  value2
key3  value3
key2  value4
key1  value5
key1  value6

作为输出,我想为每个键收集属于特定键的所有值,如下所示:

key1, value1 value5 value6
key2, value2 value4
key3, value3

这是我的Map:

public class WordMapper extends Mapper<Object, Text, Text, Text> {

 @Override
 public void map(Object key, Text value,
   Context context) throws IOException, InterruptedException {

    String[] fields = value.toString().split("\\t", -1); 
    for (int i = 0; i < fields.length; ++i) {
        if ("".equals(fields[i])) fields[i] = null;
    }
    List<String> fields_list = Arrays.asList(fields);
    Text textKey = new Text(fields_list.get(0));
    Text textValue = new Text(fields_list.get(1));
    context.write(textKey,textValue);
    }
 }

这是减速器:

public class SumReducer extends Reducer<Text, TextArrayWritable, Text, TextArrayWritable> {
    private TextArrayWritable valuesTotal = new TextArrayWritable();

    public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
        ArrayList<Text> values_list = new ArrayList<Text>();

        for (Text value : values) {
             values_list.add(value);
    }
        Text[] values_arr = new Text[values_list.size()];
        values_arr = values_list.toArray(values_arr);

         valuesTotal.setFields(values_arr);
         context.write(key, valuesTotal);
}
}

由于某种原因,我无法从我的程序中获得任何输出。它只是终止,在输出文件夹中没有留下任何内容。我有什么问题?
(我使用hadoop2.2.0和eclipse+hadoop插件。wordcount示例运行正常。)

axkjgtzd

axkjgtzd1#

问题解决了。启用日志记录后,很明显我的数据中包含第4列中缺少值的行,因此我添加了null检查 if (fields[4] != null) 而且成功了。此外,我摆脱了数组到列表的转换和textarraywritable自定义类的使用
Map器:

@Override
 public void map(Object key, Text value,
   Context context) throws IOException, InterruptedException {

    String[] fields = value.toString().split("\\t", -1); 
    for (int i = 0; i < fields.length; ++i) {
        if ("".equals(fields[i])) fields[i] = null;
    }
    if (fields[4] != null) {
    System.out.println(fields[0]);
    System.out.println(fields[4]);
    context.write(new Text(fields[0]),new Text(fields[4]));
    }
    }
}

减速器:

public class SongsReducer extends Reducer<Text, Text, Text, Text> { 
    public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
        boolean first = true;
        StringBuilder songs = new StringBuilder();;
        for (Text val : values){
              if (!first)
                songs.append(",");
              first=false;
              songs.append(val.toString());
            }

        context.write(key, new Text(songs.toString()));
}
}

相关问题