【12】Flink 之 状态(State)管理与CheckPoint容错恢复

x33g5p2x  于2021-12-25 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(600)

1、状态(State)管理与恢复

1.1、状态(State)

  • 我们前面写的word count的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和 checkpoint。
  • 首先区分一下两个概念
    state一般指一个具体的task/operator的状态【state数据默认保存在(Taskmanager的)java堆内存中】,即是将某个算子的运行状态的中间结果,保存到内存中,不太安全,机器重启或者机器挂了就丢失了。
    而checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态, checkpoint定时把state数据持久化存储,默认保存到HDFS。
    注意:task是Flink中执行的基本单位。operator指算子(transformation)。
  • State可以被记录,在失败的情况下数据还可以恢复
  • Flink中有两种基本类型的State
      1. K e y e d S t a t e \color{red}{Keyed State}KeyedState
      2. O p e r a t o r S t a t e \color{red}{Operator State}OperatorState

1.2、State形式存在

Keyed State和Operator State,可以以两种:

  • 原始状态(raw state)
  • 托管状态(managed state)
  1. 托管状态是由Flink框架管理的状态。
  2. 原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
  • 通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

1.3、State-Keyed State

  • State-Keyed State 就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。
    stream.keyBy(…) // 调用keyBy()之后就会变成基于key 的stream,keyBy的返回值是 KeyStream

  • 有四种保存State-Keyed State的数据结构:

  1. ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值
  2. ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。
  3. ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
  4. MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素。
  • 需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。

1.4、State-Operator State

  • 与Key无关的State,与Operator绑定的state,整个operator只对应一个state
    保存state的数据结构:
      ListState
  • 举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

2、容错恢复

2.1、状态容错

  • 依靠checkPoint机制
  • 保证exactly-once
     1. 只能保证Flink系统内的exactly-once
     2. 对于source和sink需要依赖外部的组件一同保证

2.2、状态容错-生成快照

其中:“Code”相当于程序中的算子任务,每个任务都会有一个State,通过checkpoint保存快照,保存起来。

2.2、状态容错-恢复快照

恢复快照时,“Code”通过保存的checkpoint进行恢复快照。

2.3、CheckPoint

2.3.1、CheckPoint介绍

  • 为了保证state的容错性,Flink需要对state进行checkpoint。
  • Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常
  • Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:
    持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)。
    用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)。
    即持久化的有两个地方:Source 和 State 。

2.3.2、CheckPoint的配置

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
  • checkpoint开启之后,默认的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有两种,Exactly-once和At-least-once
  • Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)
  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

3、State Backend(状态的后端存储)

3.1、State Backend分类

  • 默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。

  • state 的store和checkpoint的位置取决于State Backend的配置
    env.setStateBackend(…)

  • 一共有三种State Backend:

  1. MemoryStateBackend
  2. FsStateBackend
  3. RocksDBStateBackend // 基于文件、磁盘的本地数据库

3.2、三种State Backend

  • MemoryStateBackend
    state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中
    基于内存的state backend在生产环境下不建议使用
  • FsStateBackend
    state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中
    可以使用hdfs等分布式文件系统
  • RocksDBStateBackend
    RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地
    RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用

3.3、修改State Backend

修改State Backend的两种方式:

  • 第一种:单任务调整
    修改当前任务代码
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖,如下:】

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.6.0</version>
</dependency>
  • 第二种:全局调整
    修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值可以是下面几种:

1、jobmanager (MemoryStateBackend)
2、filesystem (FsStateBackend)
3、rocksdb (RocksDBStateBackend)

4、CheckPoint实践

4.1、代码实现

package com.Streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @Author: Henry
 * @Description: checkpoint的实现
 * @Date: Create in 2019/5/30 22:45
 */
public class SocketWindowWordCountCheckPoint {

    public static void main(String[] args) throws Exception{
        //获取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9999--java");
            port = 9999;
        }

        //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
        env.enableCheckpointing(1000);
        // 高级选项:
        // 设置模式为exactly-once (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoin
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setStateBackend(new FsStateBackend("hdfs://master:9000/flink/checkpoints"));
    
        String hostname = "master";
        String delimiter = "\n";
        //连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // a a c
        
        // a 1
        // a 1
        // c 1
        DataStream<WordWithCount> windowCounts = text.flatMap(
                new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String value, Collector<WordWithCount> out)
                    throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
        .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒
        .sum("count");//在这里使用sum或者reduce都可以
        /*.reduce(new ReduceFunction<WordWithCount>() {
                            public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                return new WordWithCount(a.word,a.count+b.count);
                            }
                        })*/
        //把数据打印到控制台并且设置并行度
        windowCounts.print().setParallelism(1);

        //这一行代码一定要实现,否则程序不执行
        env.execute("Socket window count");

    }

    public static class WordWithCount{
        public String word;
        public long count;
        public  WordWithCount(){}
        public WordWithCount(String word,long count){
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

4.2、运行结果

4.2.1、代码设置方式

env.setStateBackend(new FsStateBackend("hdfs://master:9000/flink/checkpoints"));

通过代码中设置检查点存储位置,应用于单任务的检查点设置容错恢复
步骤1:
  将工程代码通过Maven打jar包上传至集群中

步骤2:
  首先,通过Ncat打开一个端口,如9002,再通过yarn的方式提交flink集群运行

集群提交命令如下:

bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c com.Streaming.SocketWindowWordCountCheckPoint flink-1.0-SNAPSHOT-jar-with-dependencies.jar --port 9002

提交运行成功会显示如下界面:

步骤3:
  通过Yarn的UI页面可以查看任务情况,如下:

步骤4:
  通过点击任务进入可查看任务信息,即检查点,如下:

  在其中的"Configuration"可以看到检查点的配置信息:

步骤5:
  通过终端发送单词,更新检查点存储,查看HDFS存储信息。通过hdfs命令可以查看hdfs路径下会不断更新,存储最新的检查点内容:

步骤6:
  查看终端发送单词的统计结果:

以上方法是通过代码进行的单任务配置检查点方法。

4.2.2、配置设置方式

通过配置方式设置检查点具有全局的作用,所有任务都会在设置的检查点目录下生成各自JobID对应文件夹的CheckPoint内容。
步骤1:
  配置 flink/confi/flink-conf.yaml文件

步骤6:
  查看检查点在hdfs上的存储情况:

5、CheckPoint多版本与恢复

5.1、保存多个Checkpoint

  • 默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前
  • Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数
    state.checkpoints.num-retained: 20
  • 这样设置以后就查看对应的Checkpoint在HDFS上存储的文件目录
    hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
    如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现

5.2、Checkpoint恢复

从Checkpoint进行恢复

  • 如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点进行恢复
    bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
    程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据

恢复运行提交命令如下:

bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -s hdfs://master:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata -c com.Streaming.SocketWindowWordCountCheckPoint flink-1.0-SNAPSHOT-jar-with-dependencies.jar --port 9002

5.3、SavePoint

5.3.1、SavePoint介绍

  • Flink通过Savepoint功能可以做到程序升级后,继续从升级前的那个点开始执行计算,保证数据不中断
  • 全局,一致性快照。可以保存数据源offset,operator操作状态等信息
  • 可以从应用在过去任意做了savepoint的时刻开始继续消费

5.3.2、SavePoint使用

1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置

state.savepoints.dir: hdfs://namenode:9000/flink/savepoints

2:触发一个savepoint【直接触发或者在cancel的时候触发】

bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】

3:从指定的savepoint启动job

bin/flink run -s savepointPath [runArgs]

5.4、CheckPoint vs SavePoint

  • CheckPoint
  1. 应用定时触发,用于保存状态,会过期
  2. 内部应用失败重启的时候使用
  • SavePoint
  1. 用户手动执行,是指向Checkpoint的指针,不会过期
  2. 在升级的情况下使用
  3. 注意:为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过 uid(String) 方法手动的给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点(savepoint)将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动的设置 ID。

相关文章