在flink中,我正在使用 readTextFile
以及申请 SlidingProcessingTimeWindows.of(Time.milliseconds(60), Time.milliseconds(60))
60毫秒,滑动60毫秒。在窗口流中,我计算元组第二个字段的平均值。我的文本文件包含1100行,每行是tuple(string,integer)。我已将并行度设置为1,并在元组的第一个字段上键入消息。
当我运行代码时,每次都会得到不同的答案。我的意思是,有时它读取整个文件,有时它读取文件的前几行。它是否与滑动量的窗口大小有关?如何找出这种关系,以便我可以决定窗口的大小和滑动量?
1条答案
按热度按时间d8tt03nd1#
alpinegizmo评论中的答案是正确的。我会在这里补充一些细节。
Flink将时间窗口与新纪元的开始对齐(
1970-01-01-00:00:00
). 这意味着具有1小时窗口的窗口操作符每新的一小时(即,在00:00
,01:00
,02:00
,…)而不是第一个到达的记录。处理时间窗口基于系统的当前时间进行评估。如上所述,这意味着可以处理的数据量取决于操作员运行的机器的处理资源(硬件、cpu/io负载等)。因此,处理时间窗不能产生可靠和一致的结果。
在你的案例中,两种描述的效果可能会导致不同工作的结果不一致。根据何时开始作业,数据将被分配到不同的窗口(如果第一条记录刚好在第一个60毫秒窗口关闭之前到达,则窗口中只有此元素)。根据机器的io负载,访问和读取文件可能需要更多或更少的时间。
如果想要得到一致的结果,就需要使用事件时间。在这种情况下,基于在数据中编码的时间来处理记录,即,结果仅取决于数据而不取决于诸如作业的开始时间或处理机器的负载之类的外部影响。