flink:如何持久化和恢复valuestate

zkure5ic  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(373)

我使用flink来丰富输入流

case class Input( key: String, message: String )

用预先计算的分数

case class Score( key: String, score: Int )

并产生一个输出

case class Output( key: String, message: String, score: Int )

输入流和分数流都从Kafka主题中读取,结果输出流也发布到Kafka

val processed = inputStream.connect( scoreStream )
                           .flatMap( new ScoreEnrichmentFunction )
                           .addSink( producer )

使用以下scoreenrichmentfunction:

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{
    val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
    lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )

    override def flatMap1( input: Input, out: Collector[Output] ): Unit = 
    {
        Option( scoreState.value ) match {
            case None => out.collect( Output( input.key, input.message, -1 ) )
            case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )  
        }
    }

    override def flatMap2( score: Score, out: Collector[Output] ): Unit = 
    {
        scoreState.update( score )
    } 
}

这很有效。但是,如果我选择一个安全点并取消flink作业,当我从保存点恢复作业时,存储在valuestate中的分数将丢失。
据我所知,scoreenrichmentfunction似乎需要使用checkpointedfunction进行扩展

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction

但是我很难理解如何实现snapshotstate和initializestate方法来处理键控状态

override def snapshotState( context: FunctionSnapshotContext ): Unit = ???

override def initializeState( context: FunctionInitializationContext ): Unit = ???

请注意,我使用以下env:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism( 2 )
    env.setBufferTimeout( 1 )
    env.enableCheckpointing( 1000 )
    env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
    env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
    env.getCheckpointConfig.setCheckpointTimeout( 60000 )
    env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
    env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )
okxuctiv

okxuctiv1#

我想我找到问题了。我尝试为检查点和保存点使用不同的目录,这导致保存点目录和fsstatebackend目录不同。
在中使用相同的目录

val backend = new FsStateBackend( "file:/data", true )
env.setStateBackend( backend )

当你拿到储蓄点的时候

bin/flink cancel d75f4712346cadb4df90ec06ef257636 -s file:/data

解决问题。

相关问题