当在写入cassandra的flink作业中使用检查点时,如果由于连接问题导致写入失败,则该作业将失败,并在某个时间间隔后重新启动。
当记录失败时,此作业从何处开始?它是选择下一个要处理的记录,还是重置偏移量并尝试重新处理失败的记录?
我的检查点配置如下所示,
try{
env.setStateBackend(new RocksDBStateBackend(props.getFlinkCheckpointDataUri(), true));
env.enableCheckpointing(10000, EXACTLY_ONCE); //10 seconds
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
catch(Exception e){
System.out.println("Failed to prepare stream execution environment");
}
1条答案
按热度按时间inn6fuwd1#
flink保证至少为cassandra sink交付一次(如果您对c*示例的更新请求是幂等的,则正好一次;这意味着可以多次应用更新而不更改结果(如果启用了检查点[ref]。换句话说,如果记录未能执行,则包含这些记录的快照的检查点将不会提交。因此,将完全重试失败检查点的记录。
这是因为cassandrasink有一个checkpointcommitter,它在某些资源中存储有关已完成的检查点的附加信息。此信息用于防止在发生故障时完全重放上次完成的检查点[ref]。