Flink 窗口关闭时间

wbgh16ku  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(125)

代码如下。根据我的理解,当水印大于窗口结束时间时,窗口将关闭。但我的测试结果非常混乱。

public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> ds = env.socketTextStream("192.168.10.100", 9999);

        SingleOutputStreamOperator<T> map = ds.map(x -> {
            String[] s = x.split(" ");
            return new T(s[0], Integer.parseInt(s[1]));
        });
        WatermarkStrategy<T> wms = WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((t, l) -> {
                    LocalDateTime now = LocalDateTime.now().minusSeconds(10);
                    Date from = Date.from(now.atZone(ZoneId.systemDefault()).toInstant());
                    return from.getTime();
                });

        map.assignTimestampsAndWatermarks(wms)
                .keyBy(T::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<T, List<T>, String, TimeWindow>() {
                    @Override
                    public void process(String s,
                                        ProcessWindowFunction<T, List<T>, String, TimeWindow>.Context context,
                                        Iterable<T> elements, Collector<List<T>> out) throws Exception {
                        System.out.println(context.window().getStart() + "----" + context.window().getEnd() + "----" + context.window().maxTimestamp());
                        List<T> ts = new ArrayList<>();
                        for (T element : elements) {
                            ts.add(element);
                        }

                        out.collect(ts);
                    }
                }).print();
        env.execute();
    }
}

字符串
测试结果
1702991405520 1702991406914 1702991408207 1702991409720 1702991411758 1702991413702 1702991414803 1702991418426 1702991419629 1702991420617 1702991421944 1702991423171 1702991424718 1702991426117 1702991427736 1702991429248 1702991405000-1702991410000-170299140999
窗口为1702991405000-1702991410000,但为什么当水印到达1702991429248时窗口关闭并调用处理方法?
请帮助我理解

vulvrdjw

vulvrdjw1#

从你分享的内容中,无法看到1702991429248来自哪里,但这个值不是触发窗口的水印。

相关问题