我有一个Kafka的信息类似于以下模式: { user: 'someUser', value: 'SomeValue' , timestamp:000000000}
使用flink流计算,对这些项目执行一些计数操作。
现在我要声明一个会话,在x秒范围内收集与单个会话相同的user+值,并使用最新的时间戳,然后它将只转发到下一个流一次
所以我写了这样的东西:
data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() {
.....
})
.keyBy(new KeySelector<Data, String>(){
.......
})
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.aggregate(new AggregateFunction<Data, Data, Data>() {
@Override
public Data createAccumulator() {
return null;
}
@Override
public Data add(Data value, Data accumulator) {
if(accumulator == null) {
accumulator = value;
}
return accumulator;
}
@Override
public Data getResult(Data accumulator) {
return accumulator;
}
@Override
public Data merge(Data a, Data b) {
return a;
}
});
但问题是getresult函数是在每个元素上调用的,而不仅仅是在窗口的末尾。
我的问题是如何在窗口结束之前不将聚合结果转发到下一个流。据我所知,当没有更多元素时,进程流结果也在向前移动,即使窗口不是end yes
有什么建议吗?
谢谢
1条答案
按热度按时间h22fl7wq1#
flink提供了两种不同的方法来评估windows。在这种情况下,您要使用另一个。
一种方法是递增地评估每个窗口的内容。这就是你得到的
reduce
以及aggregate
. 当元素指定给窗口时ReduceFunction
或者AggregateFunction
被调用,该元素立即对最终结果作出贡献。另一种方法是使用
process
用一个ProcessWindowFunction
. 使用这种方法,直到窗口完成后才计算窗口,此时ProcessWindowFunction
用一个Iterable
包含分配给窗口的所有元素。这样做的缺点是在触发窗口之前需要存储所有元素,如果ProcessWindowFunction
必须做大量的工作来计算可能暂时中断管道的结果,但是有些计算需要这样做——比如计算不同的元素。有关更多信息,请参阅文档。