您可以看到检查点的大小变得越来越大,并且永远不会减少。
的数据
在web UI中可以看到是由TumblingProcessingTimeWindows引起的,我发现checkpoint的大小几乎等于TumblingProcessingTimeWindows的received,作业运行OK,流程逻辑运行成功。
的
但是,在我的apply函数中,我没有使用state来保存任何东西,所以不可能是state的内容引起的。
ret_stream = (log_stream
.map(MyMapFunction())
.filter(lambda x: self.get_key(x) is not None)
.key_by(self.get_key, key_type=Types.STRING())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(MyWindowFunction()))
class MyWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]:
def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> Iterable[OUT]:
logger.info({
"key": key,
"data": [data for _, _, data in inputs]
})
return []
字符串
另一个表现是TaskManager Threads不断增长。
我不明白为什么检查点一直在增长。我设置了nowatermark和检查点间隔为1分钟。
2条答案
按热度按时间brvekthn1#
窗口确实保持状态。特别是,分配给每个窗口的事件被保存在一个列表中,直到窗口被触发,这些列表是检查点。大多数源和接收器也保持一些状态。
窗口保存的状态量将取决于总吞吐量,这似乎是您所观察到的。
如果可以重写窗口函数,使其通过
reduce
或aggregate
递增地完成工作(例如,如果您正在计算简单的聚合,如计数、总和、最小值、最大值或平均值),那么状态和检查点的大小可能会大幅缩小。wko9yo5t2#
我通过设置我重写的触发器函数来解决这个问题。
字符串
但我还是不明白为什么窗口功能关闭后输入数据不干净