我有 checkpointing
设置在我的flink作业中,它有2个滑动窗口(这些arent连接)和1个滚动窗口连接。我的想法是,我真的不需要为未来拯救国家 join
把自己当成拯救国家的救星 2
滑动车窗本身就足够了。这个 Join
最后是一个20-30gb的状态,导致作业延迟和崩溃,检查点永远不会保存。
我怎样才能做到这一点?
我在尝试这样的事情:
public class CustomJoin implements JoinFunction<A, A, A>, ListCheckPointed<A> {
@Override
public A join(A a, A b){
// Some irrelevant join logic
}
@Override
public List<A> snapshotState(long l, long l1) throws Exception {
return new ArrayList<>();
}
@Override
public void restoreState(List<A> list) throws Exception {
}
}
这实际上避免了为join存储状态吗?它被称为:
stream
.assignTimestampsAndWatermarks(...)
.join(secondStream.assingTimestampsAndWatermarks(...))
.where(KeySelector...)
.equalTo(KeySelector...)
.window(TumblingEventTimeWindows.of(Time.minutes(1L))
.trigger(EventTimeTrigger.create())
.apply(new CustomJoin());
这在实践中有效吗?避免存储状态的最佳方法是什么?
2条答案
按热度按时间bf1o4zei1#
根据我对flink的理解,checkpoint需要确保整个计算能够安全有效地恢复,所以这种全局状态是不可避免的。但是flink自己的检查点可以关闭(它基于abs算法,性能损失很小,我不推荐),但是使用flink为自定义快照提供的保存点,但是flink检查点是增量的。保存,保存点是完全保存。我建议您看一下这些资料:1、确定分布式系统全局状态的分布式快照2、用于分布式数据流的轻量级异步快照3、https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html 我认为这能很好地解决你的问题。
g52tjvyc2#
在窗口连接中,joinfunction由窗口操作符执行。它没有自己的状态。所以你所尝试的不会有帮助。
此外,滑动窗口使用的状态比您可能意识到的要多得多。每个重叠示例都有自己的窗口内容副本。例如,如果你有一个小时长的窗口可以滑动1分钟,那么每个事件会被复制60次。