flink如何不为操作员保存状态?

41ik7eoe  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(295)

我有 checkpointing 设置在我的flink作业中,它有2个滑动窗口(这些arent连接)和1个滚动窗口连接。我的想法是,我真的不需要为未来拯救国家 join 把自己当成拯救国家的救星 2 滑动车窗本身就足够了。这个 Join 最后是一个20-30gb的状态,导致作业延迟和崩溃,检查点永远不会保存。
我怎样才能做到这一点?
我在尝试这样的事情:

public class CustomJoin implements JoinFunction<A, A, A>, ListCheckPointed<A> {

@Override
public A join(A a, A b){
 // Some irrelevant join logic
}

@Override
    public List<A> snapshotState(long l, long l1) throws Exception {
      return new ArrayList<>();
    }

    @Override
    public void restoreState(List<A> list) throws Exception {

    }
}

这实际上避免了为join存储状态吗?它被称为:

stream
.assignTimestampsAndWatermarks(...)
.join(secondStream.assingTimestampsAndWatermarks(...))
.where(KeySelector...)
.equalTo(KeySelector...)
.window(TumblingEventTimeWindows.of(Time.minutes(1L))
.trigger(EventTimeTrigger.create())
.apply(new CustomJoin());

这在实践中有效吗?避免存储状态的最佳方法是什么?

bf1o4zei

bf1o4zei1#

根据我对flink的理解,checkpoint需要确保整个计算能够安全有效地恢复,所以这种全局状态是不可避免的。但是flink自己的检查点可以关闭(它基于abs算法,性能损失很小,我不推荐),但是使用flink为自定义快照提供的保存点,但是flink检查点是增量的。保存,保存点是完全保存。我建议您看一下这些资料:1、确定分布式系统全局状态的分布式快照2、用于分布式数据流的轻量级异步快照3、https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html 我认为这能很好地解决你的问题。

g52tjvyc

g52tjvyc2#

在窗口连接中,joinfunction由窗口操作符执行。它没有自己的状态。所以你所尝试的不会有帮助。
此外,滑动窗口使用的状态比您可能意识到的要多得多。每个重叠示例都有自己的窗口内容副本。例如,如果你有一个小时长的窗口可以滑动1分钟,那么每个事件会被复制60次。

相关问题