Flink 在tumblingProcessingTimeWindows中不使用状态,检查点的大小会越来越大

velaa5lx  于 12个月前  发布在  Apache
关注(0)|答案(2)|浏览(164)

您可以看到检查点的大小变得越来越大,并且永远不会减少。


的数据
在web UI中可以看到是由TumblingProcessingTimeWindows引起的,我发现checkpoint的大小几乎等于TumblingProcessingTimeWindows的received,作业运行OK,流程逻辑运行成功。



但是,在我的apply函数中,我没有使用state来保存任何东西,所以不可能是state的内容引起的。

ret_stream = (log_stream
              .map(MyMapFunction())
              .filter(lambda x: self.get_key(x) is not None)
              .key_by(self.get_key, key_type=Types.STRING())
              .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
              .apply(MyWindowFunction()))

class MyWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]:
    def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> Iterable[OUT]:
        logger.info({
            "key": key,
            "data": [data for _, _, data in inputs]
        })
        return []

字符串
另一个表现是TaskManager Threads不断增长。

我不明白为什么检查点一直在增长。我设置了nowatermark和检查点间隔为1分钟。

brvekthn

brvekthn1#

窗口确实保持状态。特别是,分配给每个窗口的事件被保存在一个列表中,直到窗口被触发,这些列表是检查点。大多数源和接收器也保持一些状态。
窗口保存的状态量将取决于总吞吐量,这似乎是您所观察到的。
如果可以重写窗口函数,使其通过reduceaggregate递增地完成工作(例如,如果您正在计算简单的聚合,如计数、总和、最小值、最大值或平均值),那么状态和检查点的大小可能会大幅缩小。

wko9yo5t

wko9yo5t2#

我通过设置我重写的触发器函数来解决这个问题。

class MyProcessingTimeTrigger(ProcessingTimeTrigger):
    def on_processing_time(self,
                           time: int,
                           window: TimeWindow,
                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
        return TriggerResult.FIRE_AND_PURGE

ret_stream = (log_stream
                      .map(MyMapFunction())
                      .filter(lambda x: self.get_key(x) is not None)
                      .key_by(self.get_key, key_type=Types.STRING())
                      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                      .trigger(MyProcessingTimeTrigger())
                      .apply(MyWindowFunction()))

字符串
但我还是不明白为什么窗口功能关闭后输入数据不干净

相关问题