flink会话窗口结束时获取结果

b1zrtrql  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(321)

我有一个Kafka的信息类似于以下模式: { user: 'someUser', value: 'SomeValue' , timestamp:000000000} 使用flink流计算,对这些项目执行一些计数操作。
现在我要声明一个会话,在x秒范围内收集与单个会话相同的user+值,并使用最新的时间戳,然后它将只转发到下一个流一次
所以我写了这样的东西:

data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() {
        .....
    })
    .keyBy(new KeySelector<Data, String>(){

        .......
    })
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .aggregate(new AggregateFunction<Data, Data, Data>() {

        @Override
        public Data createAccumulator() {
            return null;
        }

        @Override
        public Data add(Data value, Data accumulator) {
            if(accumulator == null) {
                accumulator = value;
            }
            return accumulator;

        }

        @Override
        public Data getResult(Data accumulator) {
            return accumulator;
        }

        @Override
        public Data merge(Data a, Data b) {
            return a;
        }
   });

但问题是getresult函数是在每个元素上调用的,而不仅仅是在窗口的末尾。
我的问题是如何在窗口结束之前不将聚合结果转发到下一个流。据我所知,当没有更多元素时,进程流结果也在向前移动,即使窗口不是end yes
有什么建议吗?
谢谢

h22fl7wq

h22fl7wq1#

flink提供了两种不同的方法来评估windows。在这种情况下,您要使用另一个。
一种方法是递增地评估每个窗口的内容。这就是你得到的 reduce 以及 aggregate . 当元素指定给窗口时 ReduceFunction 或者 AggregateFunction 被调用,该元素立即对最终结果作出贡献。
另一种方法是使用 process 用一个 ProcessWindowFunction . 使用这种方法,直到窗口完成后才计算窗口,此时 ProcessWindowFunction 用一个 Iterable 包含分配给窗口的所有元素。这样做的缺点是在触发窗口之前需要存储所有元素,如果 ProcessWindowFunction 必须做大量的工作来计算可能暂时中断管道的结果,但是有些计算需要这样做——比如计算不同的元素。
有关更多信息,请参阅文档。

相关问题