Flink 计时器未触发

hyrbngr7  于 2023-01-10  发布在  Apache
关注(0)|答案(1)|浏览(403)

在Flink job中,我有一个KeyedProcessFunction
我已经实施了水印策略

val wmStrategy: WatermarkStrategy<MyInput> =
        WatermarkStrategy.forMonotonousTimestamps<MyInput>()
            .withTimestampAssigner { event: MyInput, _: Long -> event.getTimestampEvent() }

然后我把它应用到我的源数据上:

mysource.assignTimestampsAndWatermarks(wmStrategy)

processElement被调用时,定时器可以被注册为ctx.timerService().registerEventTimeTimer(timerWakeUpInstant.toEpochMilli()),并且在此之后ValueState被更新。更新成功。
下一次调用processElement时,valueState.value()返回null,而不是上次更新的值。不会在值状态上显式调用clear()。计时器永远不会触发。
目前,我正在一个“干净”的环境中进行测试,从一个文本文件中阅读,其中的数据只引用一个键,并且parallelism = 1运行到我的IDE中。
你能帮我吗?为什么状态无效?为什么计时器没有触发?

dz6r00yl

dz6r00yl1#

我自己也试过:直到已经注册定时器的X1 M1 N1 X接收到一个推进水印的消息,才调用X1 M0 N1 X。
使用事件时间计时器时,当当前水印前进到或超过时间戳时,将调用onTimer(...)方法
“当前”水印实际上是指操作员,而不是工作。这对我来说是误导,因为我认为它是集中的。
查看文档中的一些code sample,我们可以找到一个有用的注解,它可能会给予我们一个提示:
//通过使用水印提前操作符的事件时间来触发事件时间计时器

相关问题