使用cassandra write进行检查点

v8wbuo2f  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(432)

当在写入cassandra的flink作业中使用检查点时,如果由于连接问题导致写入失败,则该作业将失败,并在某个时间间隔后重新启动。
当记录失败时,此作业从何处开始?它是选择下一个要处理的记录,还是重置偏移量并尝试重新处理失败的记录?
我的检查点配置如下所示,

try{
        env.setStateBackend(new RocksDBStateBackend(props.getFlinkCheckpointDataUri(), true));
        env.enableCheckpointing(10000, EXACTLY_ONCE); //10 seconds

        CheckpointConfig config = env.getCheckpointConfig();
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    }
catch(Exception e){
        System.out.println("Failed to prepare stream execution environment");
    }
inn6fuwd

inn6fuwd1#

flink保证至少为cassandra sink交付一次(如果您对c*示例的更新请求是幂等的,则正好一次;这意味着可以多次应用更新而不更改结果(如果启用了检查点[ref]。换句话说,如果记录未能执行,则包含这些记录的快照的检查点将不会提交。因此,将完全重试失败检查点的记录。
这是因为cassandrasink有一个checkpointcommitter,它在某些资源中存储有关已完成的检查点的附加信息。此信息用于防止在发生故障时完全重放上次完成的检查点[ref]。

相关问题