我的数据流是从一个定制的sourcefunction派生的,它以确定的顺序发出窗口大小的字符串序列。其目的是在keyedstream上创建滑动窗口,以便基于eventtime对累积的字符串进行处理。为了分配eventtime和水印,我将一个带有PeriodicWatermarks的赋值器附加到流。滑动窗口使用自定义ProcessWindow函数进行处理。
env.setStreamTimeCharacteristic(EventTime)
val seqStream = env.addSource(Seqstream)
.assignTimestampsAndWatermarks(SeqTimeStampExtractor())
.keyBy(getEventtimeKey)
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize)))
val result = seqStream.process(ProcessSeqWindow(target1))
我的具有周期性水印的赋值器如下所示:
class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> {
var waterMark = 9999L
override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long {
return element.f1
}
override fun getCurrentWatermark(): Watermark? {
waterMark += 1
return Watermark(waterMark)
}
}
换句话说,源发出的每个元素都应该有自己的eventime,并且应该发出水印,在该时间内不允许再发生任何事件。在调试器中单步遍历流,表示按预期生成eventtime/watmarres。
我的期望是processseqwindow.run()应该在eventtime中使用与时间窗口成比例的元素(例如10毫秒)来调用。但是,我观察到run()是用单个元素多次调用的,并且是按eventtime的任意顺序调用的。当我强制parallelism为1时,这种行为仍然存在。我的问题是,这可能是由每个窗口上的多个触发事件引起的,还是有其他可能的解释?如何调试原因?
谢谢
1条答案
按热度按时间zaq34kh61#
作业中的水印的作用是触发滑动事件时间窗口的关闭。为了正确地扮演这个角色,它们应该基于事件中的时间戳,而不是一些任意的常量(9999l)。同一个对象负责提取时间戳并提供水印的原因是,该对象可以基于对事件流中时间戳的观察来创建水印。因此,除非您的事件时间戳也是基于增加一个类似的计数器,否则这可能解释了您看到的一些行为。
另一个问题是,虽然为每个事件调用extracttimestamp,但在周期性水印赋值器中,getcurrentwatermark方法在单独的线程中每200毫秒调用一次(默认情况下)。如果您希望在每个事件之后都添加水印,则需要使用带有标点水印的赋值器,尽管这样做有点反模式(因为有那么多水印会增加开销)。
如果你的时间戳完全是人造的,你可能会发现一个滑动计数窗口更适合你正在做的事情。