我正在实施一个 AggregateFunction
测量两个事件之间的持续时间 .window(EventTimeSessionWindows.withGap(gap))
. 处理完第二个事件后,窗口关闭。
flink会自动检查 AggregateFunction
使收集器中的现有数据不会因重新启动而丢失?
因为我不确定。我试图实现 AggregatingState
在一个 RichAggregateFunction
:
class MyAgg extends RichAggregateFunction<IN, ACC, OUT> AggregatingState
要求 AggregatingStateDescriptor
. 其构造函数具有以下签名:
String name,
AggregateFunction<IN, ACC, OUT> aggFunction,
Class<ACC> stateType) {
我很困惑 aggFunction
. 这里应该放什么?是不是 MyAgg
我首先要定义的是什么?
1条答案
按热度按时间nfg76nw01#
一
AggregateFunction
没有任何状态。但是流窗口中使用的聚合状态(由AggregateFunction
)作为窗口状态的一部分进行检查点。一
RichAggregateFunction
不能在窗口上下文中使用,并且AggregateFunction
不能有自己的状态。这样设计是因为AggregateFunction
允许使用状态描述符来定义ValueState
,例如,该状态将不可合并——为了保持窗口api合理的干净,所有窗口状态都需要可合并(为了会话窗口)。AggregatingState
你可以用在KeyedProcessFunction
,例如。在这种情况下,您需要定义如何将元素聚合到累加器中(即AggregatingState
),你可以用AggregateFunction
.