状态检查点的flink sql

p1tboqfb  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(720)

当我使用flinksqlapi过程数据时。
重新启动应用程序 sum 结果未保存在检查点中。它仍以1开头。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new FsStateBackend("file:///D:/d_backup/github/flink-best-practice/checkpoint");
env.enableCheckpointing(1000 * 60);
env.setStateBackend(stateBackend);

Table table = tableEnv.sqlQuery(
        "select sum(area_id) " +
        "from rtc_warning_gmys " +
        "where area_id = 1 " +
        "group by character_id,area_id,group_id,platform");

//   convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
//   The boolean field indicates the type of the change.
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
dsRow.map(new MapFunction<Tuple2<Boolean,Row>, Object>() {
    @Override
    public Object map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
        if(booleanRowTuple2.f0) {
            System.out.println(booleanRowTuple2.f1.toString());
            return booleanRowTuple2.f1;
        }
        return null;
    }
});

env.execute("Kafka table select");

登录方式:
1 2 3 ... ... 100
重新启动应用程序它仍然启动:1 2 3。。。
我认为总和值将存储在checkpint文件中,重新启动应用程序可以从checkpint读取最后的结果,如:
101 102 103 ... 120

4bbkushb

4bbkushb1#

一些可能性:
作业运行的时间是否足以完成检查点?仅仅因为作业产生了输出并不意味着检查点已经完成。我看到您已将检查点配置为每分钟发生一次,检查点可能需要一些时间才能完成。
工作是怎么停止的?除非检查点已外部化,否则在取消作业时将删除检查点。
作业是如何重新启动的?它是从检查点(自动)恢复的,还是从外部化的检查点或保存点恢复的,还是从零开始重新启动的?
这种实验最容易通过命令行完成。例如,你可能,
编写一个使用检查点并具有重启策略的应用程序(例如。, env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000)) )
启动本地群集
“flink run-d app.jar”启动作业
等待至少一个检查点完成
“kill-9任务管理器pid”导致失败
“taskmanager.sh start”以允许作业从检查点恢复

相关问题