Flink计数窗口

x33g5p2x  于2021-03-14 发布在 Flink  
字(1.5k)|赞(0)|评价(0)|浏览(676)
public class CountWindowAll {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT);
        AllWindowedStream<Integer, GlobalWindow> countWindowAll = mapped.countWindowAll(5);
        SingleOutputStreamOperator<Integer> summed = countWindowAll.sum(0);
        summed.print();
        env.execute("CountWindowAll");
    }
}

mapped.countWindowAll(5) 窗口大小是5个数据。

public class CountWindow {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
            String[] data = item.split(" ");
            return Tuple2.of(data[0], Integer.valueOf(data[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
        WindowedStream<Tuple2, Tuple, GlobalWindow> countWindow = keyed.countWindow(5);
        SingleOutputStreamOperator<Tuple2> summed = countWindow.sum(1);
        summed.print();
        env.execute("CountWindow");
    }
}

分组后,每个组中的数据达到一定条数,CountWindow窗口才会被触发。

相关文章