在Flink job中,我有一个KeyedProcessFunction
。
我已经实施了水印策略
val wmStrategy: WatermarkStrategy<MyInput> =
WatermarkStrategy.forMonotonousTimestamps<MyInput>()
.withTimestampAssigner { event: MyInput, _: Long -> event.getTimestampEvent() }
然后我把它应用到我的源数据上:
mysource.assignTimestampsAndWatermarks(wmStrategy)
当processElement
被调用时,定时器可以被注册为ctx.timerService().registerEventTimeTimer(timerWakeUpInstant.toEpochMilli())
,并且在此之后ValueState
被更新。更新成功。
下一次调用processElement
时,valueState.value()
返回null
,而不是上次更新的值。不会在值状态上显式调用clear()
。计时器永远不会触发。
目前,我正在一个“干净”的环境中进行测试,从一个文本文件中阅读,其中的数据只引用一个键,并且parallelism = 1
运行到我的IDE中。
你能帮我吗?为什么状态无效?为什么计时器没有触发?
1条答案
按热度按时间dz6r00yl1#
我自己也试过:直到已经注册定时器的X1 M1 N1 X接收到一个推进水印的消息,才调用X1 M0 N1 X。
使用事件时间计时器时,当当前水印前进到或超过时间戳时,将调用onTimer(...)方法
“当前”水印实际上是指操作员,而不是工作。这对我来说是误导,因为我认为它是集中的。
查看文档中的一些code sample,我们可以找到一个有用的注解,它可能会给予我们一个提示:
//通过使用水印提前操作符的事件时间来触发事件时间计时器