javamapreduce按日期计数

t8e9dugd  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(300)

我是hadoop新手,我正在尝试做一个mapreduce程序,按日期(按月份分组)统计最多前两次出现的选择器。所以我的意见是这样的:

2017-06-01 , A, B, A, C, B, E, F 
2017-06-02 , Q, B, Q, F, K, E, F
2017-06-03 , A, B, A, R, T, E, E 
2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

所以,我把它作为mapreducer程序的结果,比如:

2017-06,  A:4, E:4
2017-07,  A:4, B:4
public class ArrayGiulioTest {

    public static Logger logger = Logger.getLogger(ArrayGiulioTest.class);

    public static class CustomMap extends Mapper<LongWritable, Text, Text, TextWritable> {
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            TextWritable array = new TextWritable();
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line, ",");
            String dataAttuale = tokenizer.nextToken().substring(0,
                    line.lastIndexOf("-"));

            Text tmp = null;
            Text[] tmpArray = new Text[tokenizer.countTokens()];
            int i = 0;
            while (tokenizer.hasMoreTokens()) {
                String prod = tokenizer.nextToken(",");

                word.set(dataAttuale);
                tmp = new Text(prod);
                tmpArray[i] = tmp;

                i++;
            }

            array.set(tmpArray);

            context.write(word, array);

        }
    }

    public static class CustomReduce extends Reducer<Text, TextWritable, Text, Text> {

        public void reduce(Text key, Iterator<TextWritable> values,
                Context context) throws IOException, InterruptedException {

            MapWritable map = new MapWritable();
            Text txt = new Text();

            while (values.hasNext()) {
                TextWritable array = values.next();
                Text[] tmpArray = (Text[]) array.toArray();
                for(Text t : tmpArray) {
                    if(map.get(t)!= null) {
                        IntWritable val = (IntWritable) map.get(t);
                        map.put(t, new IntWritable(val.get()+1));
                    } else {
                        map.put(t, new IntWritable(1));
                    }
                }

            }

            Set<Writable> set = map.keySet();
            StringBuffer str = new StringBuffer();
            for(Writable k : set) {

                str.append("key: " + k.toString() + " value: " + map.get(k) + "**");
            }
            txt.set(str.toString());

            context.write(key, txt);
        }
    }

    public static void main(String[] args) throws Exception {
        long inizio = System.currentTimeMillis();
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "countProduct");
        job.setJarByClass(ArrayGiulioTest.class);

        job.setMapperClass(CustomMap.class);
        //job.setCombinerClass(CustomReduce.class);
        job.setReducerClass(CustomReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TextWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
        long fine = System.currentTimeMillis();
        logger.info("**************************************End" + (End-Start));
        System.exit(1);
    }

}

我用这种方式实现了我的自定义textwritable:

public class TextWritable extends ArrayWritable {

    public TextWritable() {
        super(Text.class);
    }
}

…所以当我运行mapreduce程序时,我得到了这样的结果

2017-6    wordcount.TextWritable@3e960865
2017-6    wordcount.TextWritable@3e960865

很明显我的减速机坏了。好像是我的Map器的输出
你知道吗?有人会说,这条路是否正确?
这里是控制台日志(仅供参考,我的输入文件有6行而不是5行)*在eclipse(mono-jvm)下启动mapreduce problem或在hdfs中使用hadoop,我得到了相同的结果

File System Counters
    FILE: Number of bytes read=1216
    FILE: Number of bytes written=431465
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=6
    Map output records=6
    Map output bytes=214
    Map output materialized bytes=232
    Input split bytes=97
    Combine input records=0
    Combine output records=0
    Reduce input groups=3
    Reduce shuffle bytes=232
    Reduce input records=6
    Reduce output records=6
    Spilled Records=12
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=0
    Total committed heap usage (bytes)=394264576
Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
File Input Format Counters 
    Bytes Read=208
File Output Format Counters 
    Bytes Written=1813
ryoqjall

ryoqjall1#

主要问题是关于reduce方法的符号:
我在写: public void reduce(Text key, Iterator<TextWritable> values, Context context) 而不是

public void reduce(Text key, Iterable<ArrayTextWritable> values,

这就是为什么我获得map输出而不是reduce otuput

pgky5nke

pgky5nke2#

我觉得你想在Map绘制器里做太多的工作了。您只需要对日期进行分组(根据预期的输出,您似乎没有正确格式化日期)。
例如,下面的方法将改变这些界限

2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

换成这副做减速机

2017-07 , ("A,B,A,C,B,E,F", "A,B,A,G,B,G,G")

换句话说,使用 ArrayWritable ,将其保留为文本。
所以,Map器看起来像这样

class CustomMap extends Mapper<LongWritable, Text, Text, Text> {

    private final Text key = new Text();
    private final Text output = new Text();

    @Override
    protected void map(LongWritable offset, Text value, Context context) throws IOException, InterruptedException {

        int separatorIndex = value.find(",");

        final String valueStr = value.toString();
        if (separatorIndex < 0) {
            System.err.printf("mapper: not enough records for %s", valueStr);
            return;
        }
        String dateKey = valueStr.substring(0, separatorIndex).trim();
        String tokens = valueStr.substring(1 + separatorIndex).trim().replaceAll("\\p{Space}", "");

        SimpleDateFormat fmtFrom = new SimpleDateFormat("yyyy-MM-dd");
        SimpleDateFormat fmtTo = new SimpleDateFormat("yyyy-MM");

        try {
            dateKey = fmtTo.format(fmtFrom.parse(dateKey));
            key.set(dateKey);
        } catch (ParseException ex) {
            System.err.printf("mapper: invalid key format %s", dateKey);
            return;
        }

        output.set(tokens);
        context.write(key, output);
    }
}

然后reducer可以构建一个Map,从值字符串中收集并计算值。同样,只写文本。

class CustomReduce extends Reducer<Text, Text, Text, Text> {

    private final Text output = new Text();

    @Override
    protected void reduce(Text date, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        Map<String, Integer> keyMap = new TreeMap<>();
        for (Text v : values) {
            String[] keys = v.toString().trim().split(",");

            for (String key : keys) {
                if (!keyMap.containsKey(key)) {
                    keyMap.put(key, 0);
                }
                keyMap.put(key, 1 + keyMap.get(key));
            }
        }

        output.set(mapToString(keyMap));
        context.write(date, output);
    }

    private String mapToString(Map<String, Integer> map) {
        StringBuilder sb = new StringBuilder();
        String delimiter = ", ";
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            sb.append(
                    String.format("%s:%d", entry.getKey(), entry.getValue())
            ).append(delimiter);
        }
        sb.setLength(sb.length()-delimiter.length());
        return sb.toString();
    }
}

根据你的意见,我知道了

2017-06 A:4, B:4, C:1, E:4, F:3, K:1, Q:2, R:1, T:1
2017-07 A:4, B:4, C:1, E:1, F:1, G:3

相关问题