Flink算子KeyBy

x33g5p2x  于2021-03-14 发布在 Flink  
字(2.6k)|赞(0)|评价(0)|浏览(613)

元组方式

public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3));
        SingleOutputStreamOperator<Tuple2> mapped = streamSource.map(new MapFunction<Integer, Tuple2>() {
            @Override
            public Tuple2 map(Integer item) throws Exception {
                return Tuple2.of(item, 1);
            }
        }).returns(Types.TUPLE(Types.INT, Types.INT));
        KeyedStream<Tuple2, Tuple> stream = mapped.keyBy(0);
        stream.print();
        env.execute("WordCountStreamingJob");
    }
}

自定义Bean方式

定义一个Bean名称为WordCount
package com.gosuncn;

public class WordCount {
    public Integer word;
    public Integer count;

    public WordCount() {
    }

    public WordCount(Integer word, Integer count) {
        this.word = word;
        this.count = count;
    }

    public static WordCount of(Integer word, Integer count) {
        return new WordCount(word, count);
    }

    @Override
    public String toString() {
        return "WordCount{" +
                "word=" + word +
                ", count=" + count +
                '}';
    }
}
通过WordCount中的word字段进行分组,不能使用脚标0 1 2 3 ...了。
public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3));
        SingleOutputStreamOperator<WordCount> mapped = streamSource.map(new MapFunction<Integer, WordCount>() {
            @Override
            public WordCount map(Integer item) throws Exception {
                return WordCount.of(item, 1);
            }
        }).returns(WordCount.class);
        KeyedStream<WordCount, Tuple> stream = mapped.keyBy("word");
        stream.print();
        env.execute("WordCountStreamingJob");
    }
}

多字段分组

public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = env.fromCollection(Arrays.asList("江苏 南京", "江苏 徐州", "湖南 长沙", "广东 广州", "辽宁 沈阳", "广东 广州", "湖南 长沙", "江苏 南京", "辽宁 沈阳", "湖南 张家界"));
        SingleOutputStreamOperator<Tuple3> mapped = streamSource.map(new MapFunction<String, Tuple3>() {
            @Override
            public Tuple3 map(String item) throws Exception {
                return Tuple3.of(item.split(" ")[0], item.split(" ")[1], 1);
            }
        }).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT));
        KeyedStream<Tuple3/*分组后的类型*/, Tuple/*分组的key的类型*/> keyed = mapped.keyBy(0, 1);
        SingleOutputStreamOperator<Tuple3> summed = keyed.sum(2);
        summed.print();
        env.execute("WordCountStreamingJob");
    }
}

同理,如果自定义Bean的话,不能用脚标了,要用Bean的字段名称。

相关文章