flink是否自动检查aggregatefunction的状态以及如何使用aggregatingstatedescriptor?

hwamh0ep  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(622)

我正在实施一个 AggregateFunction 测量两个事件之间的持续时间 .window(EventTimeSessionWindows.withGap(gap)) . 处理完第二个事件后,窗口关闭。
flink会自动检查 AggregateFunction 使收集器中的现有数据不会因重新启动而丢失?
因为我不确定。我试图实现 AggregatingState 在一个 RichAggregateFunction :
class MyAgg extends RichAggregateFunction<IN, ACC, OUT> AggregatingState 要求 AggregatingStateDescriptor . 其构造函数具有以下签名:

String name,
            AggregateFunction<IN, ACC, OUT> aggFunction,
            Class<ACC> stateType) {

我很困惑 aggFunction . 这里应该放什么?是不是 MyAgg 我首先要定义的是什么?

nfg76nw0

nfg76nw01#

AggregateFunction 没有任何状态。但是流窗口中使用的聚合状态(由 AggregateFunction )作为窗口状态的一部分进行检查点。
RichAggregateFunction 不能在窗口上下文中使用,并且 AggregateFunction 不能有自己的状态。这样设计是因为 AggregateFunction 允许使用状态描述符来定义 ValueState ,例如,该状态将不可合并——为了保持窗口api合理的干净,所有窗口状态都需要可合并(为了会话窗口)。 AggregatingState 你可以用在 KeyedProcessFunction ,例如。在这种情况下,您需要定义如何将元素聚合到累加器中(即 AggregatingState ),你可以用 AggregateFunction .

相关问题