如何使用ApacheFlink对流进行会话化?

4si2a6ki  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(304)

我想对这个流进行会话:1,1,1,2,2,2,2,3,3,3,3,3,0,3,3,3,3,3,5。。。对于这些会话:

1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5

我编写了customtrigger来检测流元素何时从1变为2(2变为3,3变为0等等),然后触发触发器。但这不是解决方案,因为当我处理2的第一个元素并触发触发器时,窗口将是[1,1,1,2],但我需要在1的最后一个元素上触发触发器。
以下是自定义触发器类中OneElement函数的pesudo:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if (prevState == element.value) {
      prevState = element.value
      TriggerResult.CONTINUE
    } else {
      prevState = element.value
      TriggerResult.FIRE
    }
}

我怎样才能解决这个问题?

wqnecbli

wqnecbli1#

我想一个 FlatMapFunction 用一个 ListState 是实现这个用例的最简单的方法。
当新元素到达时(即 flatMap() 方法),检查值是否更改。如果值没有更改,则将元素附加到状态。如果值更改,则将当前列表状态作为会话发出,清除列表,并将新元素作为列表状态的第一个元素插入。
但是,您应该记住,这是假定元素的顺序保持不变的。flink确保在一个分区内,也就是说,只要元素没有被洗牌,所有操作符都以相同的并行度运行。

相关问题